From 6b83f363f8e237e712bb6eacb4663432a60ef377 Mon Sep 17 00:00:00 2001 From: Xuan Sang LE Date: Tue, 25 Sep 2018 17:51:56 +0200 Subject: [PATCH 01/15] using async scheduler for the relayd --- Makefile | 6 +- libs/scheduler.c | 301 +++++++++++++++++++++++++++++++++++++++++++++++ libs/scheduler.h | 97 +++++++++++++++ libs/utils.h | 3 + relay.c | 123 +++++++++++++++++++ relayd | 8 ++ 6 files changed, 537 insertions(+), 1 deletion(-) create mode 100644 libs/scheduler.c create mode 100644 libs/scheduler.h create mode 100644 relay.c create mode 100644 relayd diff --git a/Makefile b/Makefile index 3a60486..068ff55 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,8 @@ LIBOBJS = libs/ini.o \ libs/utils.o \ libs/ws.o \ libs/sha1.o \ - libs/list.o + libs/list.o \ + libs/scheduler.o PLUGINSDEP = libs/plugin.o @@ -30,6 +31,9 @@ httpd: lib $(SERVER_O) $(CC) $(CFLAGS) $(SERVER_O) -o $(BUILDIRD)/httpd httpd.c $(SERVERLIB) cp antd $(BUILDIRD) +relay: lib $(SERVER_O) + $(CC) $(CFLAGS) $(SERVER_O) -o $(BUILDIRD)/relay relay.c $(SERVERLIB) + cp relayd $(BUILDIRD) lib: $(LIBOBJS) $(CC) $(CFLAGS) $(DB_LIB) $(SSL_LIB) -shared -o $(LIB_NAME).$(EXT) $(LIBOBJS) cp $(LIB_NAME).$(EXT) $(LIB_PATH$)/ diff --git a/libs/scheduler.c b/libs/scheduler.c new file mode 100644 index 0000000..0b04f1b --- /dev/null +++ b/libs/scheduler.c @@ -0,0 +1,301 @@ +#include "scheduler.h" + +/* +private data & methods +*/ +static antd_scheduler_t scheduler; + +static void enqueue(antd_task_t* task) +{ + // check if task is exist + int prio = task->priority>N_PRIORITY-1?N_PRIORITY-1:task->priority; + antd_task_queue_t q = scheduler.task_queue[prio]; + antd_task_item_t it = q; + while(it && it->task->id != task->id && it->next != NULL) + it = it->next; + if(it && it->task->id == task->id) + { + LOG("Task %d exists, ignore it\n", task->id); + return; + } + antd_task_item_t taski = (antd_task_item_t)malloc(sizeof *taski); + taski->task = task; + taski->next = NULL; + if(!it) // first task + { + scheduler.task_queue[prio] = taski; + } + else + { + it->next = taski; + } +} + +static int working() +{ + return scheduler.status; +} + +static void stop() +{ + pthread_mutex_lock(&scheduler.server_lock); + scheduler.status = 0; + pthread_mutex_unlock(&scheduler.server_lock); + for (int i = 0; i < scheduler.n_workers; i++) + pthread_join(scheduler.workers[i].pid, NULL); + if(scheduler.workers) free(scheduler.workers); +} + +static antd_task_item_t dequeue(int priority) +{ + int prio = priority>N_PRIORITY-1?N_PRIORITY-1:priority; + antd_task_item_t it = scheduler.task_queue[prio]; + if(it) + { + scheduler.task_queue[prio] = it->next; + } + return it; +} + +static antd_task_item_t next_task() +{ + antd_task_item_t it = NULL; + pthread_mutex_lock(&scheduler.server_lock); + for(int i = 0; i< N_PRIORITY; i++) + { + + it = dequeue(i); + if(it) break; + } + pthread_mutex_unlock(&scheduler.server_lock); + return it; +} + +static int available_workers() +{ + int n = 0; + pthread_mutex_lock(&scheduler.server_lock); + for(int i=0; i < scheduler.n_workers; i++) + if(scheduler.workers[i].status == 0) n++; + pthread_mutex_unlock(&scheduler.server_lock); + return n; +} + +static void work(void* data) +{ + antd_task_item_t it; + antd_worker_t* worker = (antd_worker_t*)data; + while(working()) + { + pthread_mutex_lock(&scheduler.server_lock); + worker->status = 0; + pthread_mutex_unlock(&scheduler.server_lock); + // fetch the next in queue + it = next_task(); + if(!it) continue; + //LOG("worker processing \n"); + pthread_mutex_lock(&scheduler.server_lock); + worker->status = 1; + pthread_mutex_unlock(&scheduler.server_lock); + // execute the task + antd_execute_task(it); + } +} + +static antd_callback_t* callback_of( void* (*callback)(void*) ) +{ + antd_callback_t* cb = NULL; + if(callback) + { + cb = (antd_callback_t*)malloc(sizeof *cb); + cb->handle = callback; + cb->next = NULL; + } + return cb; +} + +static void free_callback(antd_callback_t* cb) +{ + antd_callback_t* it = cb; + antd_callback_t* curr; + while(it) + { + curr = it; + it = it->next; + free(curr); + } +} + +static void enqueue_callback(antd_callback_t* cb, antd_callback_t* el) +{ + antd_callback_t* it = cb; + while(it && it->next != NULL) + it = it->next; + if(!it) return; // this should not happend + it->next = el; +} + +static void execute_callback(antd_task_t* task) +{ + antd_callback_t* cb = task->callback; + if(cb) + { + // call the first come call back + task->handle = cb->handle; + task->callback = task->callback->next; + free(cb); + antd_add_task(task); + } + else + { + free(task); + } +} + + +/* + Main API methods + init the main scheduler +*/ +void antd_scheduler_init(int n) +{ + time_t t; + srand((unsigned) time(&t)); + scheduler.n_workers = n; + scheduler.status = 1; + scheduler.workers = (antd_worker_t*)malloc(n*(sizeof(antd_worker_t))); + if(!scheduler.workers) + { + LOG("Cannot allocate memory for worker\n"); + exit(-1); + } + for(int i = 0; i < N_PRIORITY; i++) scheduler.task_queue[i] = NULL; + // create scheduler.workers + for(int i = 0; i < scheduler.n_workers;i++) + { + scheduler.workers[i].status = 0; + if (pthread_create(&scheduler.workers[i].pid , NULL,(void *(*)(void *))work, (void*)&scheduler.workers[i]) != 0) + { + scheduler.workers[i].status = -1; + perror("pthread_create: cannot create worker\n"); + } + } + LOG("Antd scheduler initialized with %d worker\n", scheduler.n_workers); +} +void antd_task_lock() +{ + pthread_mutex_lock(&scheduler.task_lock); +} +void antd_task_unlock() +{ + pthread_mutex_unlock(&scheduler.task_lock); +} +/* + destroy all pending task +*/ +void antd_scheduler_destroy() +{ + // free all the chains + antd_task_item_t it, curr; + for(int i=0; i < N_PRIORITY; i++) + { + it = scheduler.task_queue[i]; + while(it) + { + // first free the task + if(it->task && it->task->callback) free_callback(it->task->callback); + if(it->task) free(it->task); + // then free the placeholder + curr = it; + it = it->next; + free(curr); + } + } + stop(); +} + +/* + create a task +*/ +antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callback)(void*)) +{ + antd_task_t* task = (antd_task_t*)malloc(sizeof *task); + task->stamp = (unsigned long)time(NULL); + task->id = rand(); + task->data = data; + task->handle = handle; + task->callback = callback_of(callback); + task->priority = NORMAL_PRIORITY; + return task; +} + +/* + scheduling a task +*/ +void antd_add_task(antd_task_t* task) +{ + pthread_mutex_lock(&scheduler.server_lock); + enqueue(task); + pthread_mutex_unlock(&scheduler.server_lock); +} + + +void antd_execute_task(antd_task_item_t taski) +{ + // execute the task + void *ret = (*(taski->task->handle))(taski->task->data); + // check the return data if it is a new task + if(!ret) + { + // call the first callback + execute_callback(taski->task); + free(taski); + } + else + { + antd_task_t* rtask = (antd_task_t*) ret; + if(taski->task->callback) + { + if(rtask->callback) + { + enqueue_callback(rtask->callback, taski->task->callback); + } + else + { + rtask->callback = taski->task->callback; + } + } + if(!rtask->handle) + { + // call the first callback + execute_callback(rtask); + free(taski->task); + free(taski); + } + else + { + antd_add_task(rtask); + free(taski->task); + free(taski); + } + } +} +int antd_scheduler_busy() +{ + int ret = 0; + if(available_workers() != scheduler.n_workers) return 1; + + pthread_mutex_lock(&scheduler.server_lock); + for(int i = 0; i < N_PRIORITY; i++) + if(scheduler.task_queue[i] != NULL) + { + ret = 1; + break; + } + pthread_mutex_unlock(&scheduler.server_lock); + return ret; +} +int antd_scheduler_status() +{ + return scheduler.status; +} \ No newline at end of file diff --git a/libs/scheduler.h b/libs/scheduler.h new file mode 100644 index 0000000..9d1ff65 --- /dev/null +++ b/libs/scheduler.h @@ -0,0 +1,97 @@ +#ifndef ANT_SCHEDULER +#define ANT_SCHEDULER + +#include "utils.h" +#include +// thread pool of workers +#define N_PRIORITY 10 +#define NORMAL_PRIORITY ((int)((N_PRIORITY - 1) / 2)) +#define LOW_PRIORITY (N_PRIORITY - 1) +#define HIGH_PRIORITY 0 + +// callback definition +typedef struct __callback_t{ + void* (*handle)(void*); + struct __callback_t * next; +} antd_callback_t; +// task definition +typedef struct { + /* + creation time of a task + */ + unsigned long stamp; + /* + unique id + */ + int id; + /* + priority from 0 to 9 + higher value is lower priority + */ + uint8_t priority; + /* + the callback + */ + void* (*handle)(void*); + antd_callback_t* callback; + /* + user data if any + */ + void * data; + +} antd_task_t; + +typedef struct { + pthread_t pid; + uint8_t status; // -1 quit, 0 available, 1 busy +} antd_worker_t; + + +typedef struct __task_item_t{ + antd_task_t* task; + struct __task_item_t* next; +}* antd_task_item_t; + +typedef antd_task_item_t antd_task_queue_t; + +typedef struct { + pthread_mutex_t server_lock; + pthread_mutex_t task_lock; + antd_task_queue_t task_queue[N_PRIORITY]; + uint8_t status; // 0 stop, 1 working + antd_worker_t* workers; + int n_workers; +} antd_scheduler_t; + +/* + init the main scheduler +*/ +void antd_scheduler_init(); +/* + destroy all pending task +*/ +void antd_scheduler_destroy(); + +/* + create a task +*/ +antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callback)(void*)); + +/* + scheduling a task +*/ +void antd_add_task(antd_task_t*); + +void antd_task_lock(); +void antd_task_unlock(); +/* + Execute a task +*/ +int antd_scheduler_status(); +/* + execute and free a task a task +*/ +void antd_execute_task(antd_task_item_t); + +int antd_scheduler_busy(); +#endif \ No newline at end of file diff --git a/libs/utils.h b/libs/utils.h index 139df74..c9b8a3d 100644 --- a/libs/utils.h +++ b/libs/utils.h @@ -58,6 +58,9 @@ THE SOFTWARE. #else #define LOG(a,...) do{}while(0) #endif +// add this to the utils +#define UNUSED(x) (void)(x) + #define BUFFLEN 1024 #define HASHSIZE 1024 #define DHASHSIZE 50 diff --git a/relay.c b/relay.c new file mode 100644 index 0000000..455df91 --- /dev/null +++ b/relay.c @@ -0,0 +1,123 @@ +#include "http_server.h" +#include "libs/scheduler.h" +/* +this node is a relay from the http +to https +*/ +#define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0 +int server_sock = -1; +void stop_serve(int dummy) { + UNUSED(dummy); + antd_scheduler_destroy(); + close(server_sock); +} +/* +HTTP/1.1 301 Moved Permanently +Location: http://www.example.org/ +Content-Type: text/html +Content-Length: 174 +*/ +void* antd_redirect(void* user_data) +{ + void** data = (void**)user_data; + void* client = data[0]; + char* host = (char*)data[1]; + __t(client,"%s", "HTTP/1.1 301 Moved Permanently"); + __t(client, "Location: https://%s", host); + __t(client, "%s", "Content-Type: text/html"); + __t(client, ""); + __t(client, "This page has moved to https://%s", host); + free(host); + free(user_data); + return antd_create_task(NULL,client, NULL); +} + +void* antd_free_client(void* client) +{ + antd_client_t * source = (antd_client_t *) client; + if(source->ip) free(source->ip); + close(source->sock); + free(client); + return NULL; +} + +void* antd_get_host(void * client) +{ + char buff[1024]; + char* line, *token; + char* host = NULL; + while((read_buf(client,buff,sizeof(buff))) && strcmp("\r\n",buff)) + { + line = buff; + trim(line, '\n'); + trim(line, '\r'); + token = strsep(&line, ":"); + trim(token,' '); + trim(line,' '); + if(token && strcasecmp(token,"Host")==0) + if(line) + { + host = strdup(line); + break; + } + } + if(!host) host = strdup("lxsang.me"); + void** data = (void**)malloc(2*(sizeof *data)); + data[0] = client; + data[1] = (void*)host; + LOG("Host is %s\n", host); + return antd_create_task(antd_redirect,data, NULL); +} + +int main(int argc, char* argv[]) +{ + UNUSED(argc); + UNUSED(argv); +// load the config first + unsigned port = 80; + int client_sock = -1; + struct sockaddr_in client_name; + socklen_t client_name_len = sizeof(client_name); + // ignore the broken PIPE error when writing + //or reading to/from a closed socked connection + signal(SIGPIPE, SIG_IGN); + signal(SIGABRT, SIG_IGN); + signal(SIGINT, stop_serve); + server_sock = startup(&port); + // 1 worker is + antd_scheduler_init(1); + LOG("httpd running on port %d\n", port); + + while (antd_scheduler_status()) + { + client_sock = accept(server_sock,(struct sockaddr *)&client_name,&client_name_len); + if (client_sock == -1) + { + perror("Cannot accept client request\n"); + continue; + } + antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); + // set timeout to socket + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 500; + + if (setsockopt (client_sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,sizeof(timeout)) < 0) + perror("setsockopt failed\n"); + + if (setsockopt (client_sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout,sizeof(timeout)) < 0) + perror("setsockopt failed\n"); + + /* + get the remote IP + */ + client->ip = NULL; + if (client_name.sin_family == AF_INET) + client->ip = strdup(inet_ntoa(client_name.sin_addr)); + client->sock = client_sock; + //accept_request(&client); + antd_add_task(antd_create_task(antd_get_host,(void*)client, antd_free_client )); + } + + return(0); +} \ No newline at end of file diff --git a/relayd b/relayd new file mode 100644 index 0000000..dd2f1cc --- /dev/null +++ b/relayd @@ -0,0 +1,8 @@ +#!/bin/sh +UNAME=`uname -s` + +if [ "$UNAME" = "Darwin" ]; then + DYLD_LIBRARY_PATH=$(dirname "$0")/plugins/ $(dirname "$0")/relay +else + LD_LIBRARY_PATH=$(dirname "$0")/plugins/ $(dirname "$0")/relay +fi From 90d44d782ffeb7077e356e6376925ec841003b7a Mon Sep 17 00:00:00 2001 From: Xuan Sang LE Date: Wed, 26 Sep 2018 10:30:04 +0200 Subject: [PATCH 02/15] use single thread scheduler and worker --- Makefile | 2 +- relayd => forward | 0 libs/scheduler.c | 79 ++++++++++++++++++++++++++--------------------- libs/scheduler.h | 1 + relay.c | 22 ++++++++----- 5 files changed, 60 insertions(+), 44 deletions(-) rename relayd => forward (100%) diff --git a/Makefile b/Makefile index 068ff55..bbb9af4 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ httpd: lib $(SERVER_O) relay: lib $(SERVER_O) $(CC) $(CFLAGS) $(SERVER_O) -o $(BUILDIRD)/relay relay.c $(SERVERLIB) - cp relayd $(BUILDIRD) + cp forward $(BUILDIRD) lib: $(LIBOBJS) $(CC) $(CFLAGS) $(DB_LIB) $(SSL_LIB) -shared -o $(LIB_NAME).$(EXT) $(LIBOBJS) cp $(LIB_NAME).$(EXT) $(LIB_PATH$)/ diff --git a/relayd b/forward similarity index 100% rename from relayd rename to forward diff --git a/libs/scheduler.c b/libs/scheduler.c index 0b04f1b..9751d31 100644 --- a/libs/scheduler.c +++ b/libs/scheduler.c @@ -80,28 +80,6 @@ static int available_workers() pthread_mutex_unlock(&scheduler.server_lock); return n; } - -static void work(void* data) -{ - antd_task_item_t it; - antd_worker_t* worker = (antd_worker_t*)data; - while(working()) - { - pthread_mutex_lock(&scheduler.server_lock); - worker->status = 0; - pthread_mutex_unlock(&scheduler.server_lock); - // fetch the next in queue - it = next_task(); - if(!it) continue; - //LOG("worker processing \n"); - pthread_mutex_lock(&scheduler.server_lock); - worker->status = 1; - pthread_mutex_unlock(&scheduler.server_lock); - // execute the task - antd_execute_task(it); - } -} - static antd_callback_t* callback_of( void* (*callback)(void*) ) { antd_callback_t* cb = NULL; @@ -151,33 +129,64 @@ static void execute_callback(antd_task_t* task) free(task); } } - +static void work(void* data) +{ + antd_worker_t* worker = (antd_worker_t*)data; + while(working()) + { + antd_attach_task(worker); + } +} /* Main API methods init the main scheduler */ + +/* +* assign task to a worker +*/ +void antd_attach_task(antd_worker_t* worker) +{ + antd_task_item_t it; + pthread_mutex_lock(&scheduler.server_lock); + worker->status = 0; + pthread_mutex_unlock(&scheduler.server_lock); + // fetch the next in queue + it = next_task(); + if(!it) return; + //LOG("worker processing \n"); + pthread_mutex_lock(&scheduler.server_lock); + worker->status = 1; + pthread_mutex_unlock(&scheduler.server_lock); + // execute the task + antd_execute_task(it); +} + void antd_scheduler_init(int n) { time_t t; srand((unsigned) time(&t)); scheduler.n_workers = n; scheduler.status = 1; - scheduler.workers = (antd_worker_t*)malloc(n*(sizeof(antd_worker_t))); - if(!scheduler.workers) - { - LOG("Cannot allocate memory for worker\n"); - exit(-1); - } for(int i = 0; i < N_PRIORITY; i++) scheduler.task_queue[i] = NULL; // create scheduler.workers - for(int i = 0; i < scheduler.n_workers;i++) + if(n > 0) { - scheduler.workers[i].status = 0; - if (pthread_create(&scheduler.workers[i].pid , NULL,(void *(*)(void *))work, (void*)&scheduler.workers[i]) != 0) + scheduler.workers = (antd_worker_t*)malloc(n*(sizeof(antd_worker_t))); + if(!scheduler.workers) { - scheduler.workers[i].status = -1; - perror("pthread_create: cannot create worker\n"); + LOG("Cannot allocate memory for worker\n"); + exit(-1); + } + for(int i = 0; i < scheduler.n_workers;i++) + { + scheduler.workers[i].status = 0; + if (pthread_create(&scheduler.workers[i].pid , NULL,(void *(*)(void *))work, (void*)&scheduler.workers[i]) != 0) + { + scheduler.workers[i].status = -1; + perror("pthread_create: cannot create worker\n"); + } } } LOG("Antd scheduler initialized with %d worker\n", scheduler.n_workers); @@ -197,6 +206,7 @@ void antd_scheduler_destroy() { // free all the chains antd_task_item_t it, curr; + stop(); for(int i=0; i < N_PRIORITY; i++) { it = scheduler.task_queue[i]; @@ -211,7 +221,6 @@ void antd_scheduler_destroy() free(curr); } } - stop(); } /* diff --git a/libs/scheduler.h b/libs/scheduler.h index 9d1ff65..f304776 100644 --- a/libs/scheduler.h +++ b/libs/scheduler.h @@ -94,4 +94,5 @@ int antd_scheduler_status(); void antd_execute_task(antd_task_item_t); int antd_scheduler_busy(); +void antd_attach_task(antd_worker_t* worker); #endif \ No newline at end of file diff --git a/relay.c b/relay.c index 455df91..d76648c 100644 --- a/relay.c +++ b/relay.c @@ -1,5 +1,6 @@ #include "http_server.h" #include "libs/scheduler.h" +#include /* this node is a relay from the http to https @@ -65,7 +66,7 @@ void* antd_get_host(void * client) void** data = (void**)malloc(2*(sizeof *data)); data[0] = client; data[1] = (void*)host; - LOG("Host is %s\n", host); + LOG("[%s] Request for: %s --> https://%s\n", ((antd_client_t*)client)->ip, host, host); return antd_create_task(antd_redirect,data, NULL); } @@ -84,23 +85,28 @@ int main(int argc, char* argv[]) signal(SIGABRT, SIG_IGN); signal(SIGINT, stop_serve); server_sock = startup(&port); - // 1 worker is - antd_scheduler_init(1); - LOG("httpd running on port %d\n", port); + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 500; + // 0 worker + antd_scheduler_init(0); + antd_worker_t worker; + worker.status = 0; + // set server socket to non blocking + fcntl(server_sock, F_SETFL, O_NONBLOCK); /* Change the socket into non-blocking state */ + LOG("relayd running on port %d\n", port); while (antd_scheduler_status()) { + // execute task + antd_attach_task(&worker); client_sock = accept(server_sock,(struct sockaddr *)&client_name,&client_name_len); if (client_sock == -1) { - perror("Cannot accept client request\n"); continue; } antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); // set timeout to socket - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = 500; if (setsockopt (client_sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,sizeof(timeout)) < 0) perror("setsockopt failed\n"); From d50b1352f656f11c5bc2a8f1d8f310666f80e82d Mon Sep 17 00:00:00 2001 From: Xuan Sang LE Date: Wed, 26 Sep 2018 11:07:48 +0200 Subject: [PATCH 03/15] TODO: remove extern variable --- libs/handle.c | 1 + libs/plugin.c | 3 +-- libs/plugin.h | 15 +++++++++++++-- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/libs/handle.c b/libs/handle.c index e4becdd..670a0ab 100644 --- a/libs/handle.c +++ b/libs/handle.c @@ -134,6 +134,7 @@ int antd_close(void* src) //printf("Close sock %d\n", source->sock); int ret = close(source->sock); if(source->ip) free(source->ip); + // TODO remove this when using nonblocking server_config.connection--; LOG("Remaining connection %d\n", server_config.connection); free(src); diff --git a/libs/plugin.c b/libs/plugin.c index 24b38dc..b804861 100644 --- a/libs/plugin.c +++ b/libs/plugin.c @@ -2,7 +2,6 @@ plugin_header __plugin__; // private function -call __init__; void __init_plugin__(const char* pl,config_t* conf){ __plugin__.name = strdup(pl); __plugin__.dbpath= strdup(conf->db_path); @@ -12,7 +11,7 @@ void __init_plugin__(const char* pl,config_t* conf){ #ifdef USE_OPENSSL __plugin__.usessl = conf->usessl; #endif - if(__init__ != NULL) __init__(); + init(); }; #ifdef USE_DB diff --git a/libs/plugin.h b/libs/plugin.h index a51dbf0..8f5ac48 100644 --- a/libs/plugin.h +++ b/libs/plugin.h @@ -19,12 +19,21 @@ typedef struct { -typedef void(*call)(); +//typedef void(*call)(); #ifdef USE_DB typedef sqlite3* sqldb; #endif +/* +Two server, +Two configuration different +Does it work +Replace this by a accessing function +that execute the set value to +the header, instead of +exporting global variables +*/ extern plugin_header __plugin__; -extern call __init__; +//extern call __init__; #ifdef USE_DB @@ -36,6 +45,8 @@ char* route(const char*); char* htdocs(const char*); char* config_dir(); /*Default function for plugin*/ +// init the plugin +void init(); void handle(void*, const char*,const char*,dictionary); void __release(); #endif From cbd574cbc053885c7d98ba9125b3e1782a4a9951 Mon Sep 17 00:00:00 2001 From: Xuan Sang LE Date: Thu, 27 Sep 2018 15:00:19 +0200 Subject: [PATCH 04/15] fix scheduler bug --- libs/scheduler.c | 183 +++++++++++++++++++++++++++++++---------------- libs/scheduler.h | 19 ++++- relay.c | 12 ++-- 3 files changed, 143 insertions(+), 71 deletions(-) diff --git a/libs/scheduler.c b/libs/scheduler.c index 9751d31..8f7dc41 100644 --- a/libs/scheduler.c +++ b/libs/scheduler.c @@ -4,18 +4,15 @@ private data & methods */ static antd_scheduler_t scheduler; - -static void enqueue(antd_task_t* task) +static void enqueue(antd_task_queue_t* q, antd_task_t* task) { - // check if task is exist - int prio = task->priority>N_PRIORITY-1?N_PRIORITY-1:task->priority; - antd_task_queue_t q = scheduler.task_queue[prio]; - antd_task_item_t it = q; + antd_task_item_t it = *q; while(it && it->task->id != task->id && it->next != NULL) it = it->next; if(it && it->task->id == task->id) { LOG("Task %d exists, ignore it\n", task->id); + //assert(it->task->id == task->id ); return; } antd_task_item_t taski = (antd_task_item_t)malloc(sizeof *taski); @@ -23,7 +20,7 @@ static void enqueue(antd_task_t* task) taski->next = NULL; if(!it) // first task { - scheduler.task_queue[prio] = taski; + *q = taski; } else { @@ -33,26 +30,35 @@ static void enqueue(antd_task_t* task) static int working() { - return scheduler.status; + int stat; + pthread_mutex_lock(&scheduler.scheduler_lock); + stat = scheduler.status; + pthread_mutex_unlock(&scheduler.scheduler_lock); + return stat; } static void stop() { - pthread_mutex_lock(&scheduler.server_lock); + pthread_mutex_lock(&scheduler.scheduler_lock); scheduler.status = 0; - pthread_mutex_unlock(&scheduler.server_lock); + pthread_mutex_unlock(&scheduler.scheduler_lock); for (int i = 0; i < scheduler.n_workers; i++) pthread_join(scheduler.workers[i].pid, NULL); if(scheduler.workers) free(scheduler.workers); + // destroy all the mutex + pthread_mutex_destroy(&scheduler.scheduler_lock); + pthread_mutex_destroy(&scheduler.task_lock); + pthread_mutex_destroy(&scheduler.queue_lock); + pthread_mutex_destroy(&scheduler.worker_lock); } -static antd_task_item_t dequeue(int priority) +static antd_task_item_t dequeue(antd_task_queue_t* q) { - int prio = priority>N_PRIORITY-1?N_PRIORITY-1:priority; - antd_task_item_t it = scheduler.task_queue[prio]; + antd_task_item_t it = *q; if(it) { - scheduler.task_queue[prio] = it->next; + *q = it->next; + it->next = NULL; } return it; } @@ -60,26 +66,13 @@ static antd_task_item_t dequeue(int priority) static antd_task_item_t next_task() { antd_task_item_t it = NULL; - pthread_mutex_lock(&scheduler.server_lock); - for(int i = 0; i< N_PRIORITY; i++) - { - - it = dequeue(i); - if(it) break; - } - pthread_mutex_unlock(&scheduler.server_lock); + pthread_mutex_lock(&scheduler.queue_lock); + it = dequeue(&scheduler.workers_queue); + pthread_mutex_unlock(&scheduler.queue_lock); return it; } -static int available_workers() -{ - int n = 0; - pthread_mutex_lock(&scheduler.server_lock); - for(int i=0; i < scheduler.n_workers; i++) - if(scheduler.workers[i].status == 0) n++; - pthread_mutex_unlock(&scheduler.server_lock); - return n; -} + static antd_callback_t* callback_of( void* (*callback)(void*) ) { antd_callback_t* cb = NULL; @@ -138,28 +131,48 @@ static void work(void* data) } } +static void destroy_queue(antd_task_queue_t q) +{ + antd_task_item_t it, curr; + it = q; + while(it) + { + // first free the task + if(it->task && it->task->callback) free_callback(it->task->callback); + if(it->task) free(it->task); + // then free the placeholder + curr = it; + it = it->next; + free(curr); + } +} /* Main API methods init the main scheduler */ - +int antd_available_workers() +{ + int n = 0; + pthread_mutex_lock(&scheduler.worker_lock); + for(int i=0; i < scheduler.n_workers; i++) + if(scheduler.workers[i].status == 0) n++; + pthread_mutex_unlock(&scheduler.worker_lock); + return n; +} /* * assign task to a worker */ void antd_attach_task(antd_worker_t* worker) { antd_task_item_t it; - pthread_mutex_lock(&scheduler.server_lock); - worker->status = 0; - pthread_mutex_unlock(&scheduler.server_lock); - // fetch the next in queue + pthread_mutex_lock(&scheduler.worker_lock); it = next_task(); - if(!it) return; - //LOG("worker processing \n"); - pthread_mutex_lock(&scheduler.server_lock); - worker->status = 1; - pthread_mutex_unlock(&scheduler.server_lock); + worker->status = 0; + if(it) + worker->status = 1; + pthread_mutex_unlock(&scheduler.worker_lock); // execute the task + //LOG("task executed by worker %d\n", worker->pid); antd_execute_task(it); } @@ -169,6 +182,12 @@ void antd_scheduler_init(int n) srand((unsigned) time(&t)); scheduler.n_workers = n; scheduler.status = 1; + scheduler.workers_queue = NULL; + // init lock + pthread_mutex_init(&scheduler.scheduler_lock,NULL); + pthread_mutex_init(&scheduler.task_lock,NULL); + pthread_mutex_init(&scheduler.worker_lock,NULL); + pthread_mutex_init(&scheduler.queue_lock,NULL); for(int i = 0; i < N_PRIORITY; i++) scheduler.task_queue[i] = NULL; // create scheduler.workers if(n > 0) @@ -201,26 +220,17 @@ void antd_task_unlock() } /* destroy all pending task + pthread_mutex_lock(&scheduler.queue_lock); */ void antd_scheduler_destroy() { // free all the chains - antd_task_item_t it, curr; stop(); for(int i=0; i < N_PRIORITY; i++) { - it = scheduler.task_queue[i]; - while(it) - { - // first free the task - if(it->task && it->task->callback) free_callback(it->task->callback); - if(it->task) free(it->task); - // then free the placeholder - curr = it; - it = it->next; - free(curr); - } + destroy_queue(scheduler.task_queue[i]); } + destroy_queue(scheduler.workers_queue); } /* @@ -235,6 +245,7 @@ antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callba task->handle = handle; task->callback = callback_of(callback); task->priority = NORMAL_PRIORITY; + task->type = LIGHT; return task; } @@ -243,14 +254,17 @@ antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callba */ void antd_add_task(antd_task_t* task) { - pthread_mutex_lock(&scheduler.server_lock); - enqueue(task); - pthread_mutex_unlock(&scheduler.server_lock); + // check if task is exist + int prio = task->priority>N_PRIORITY-1?N_PRIORITY-1:task->priority; + pthread_mutex_lock(&scheduler.scheduler_lock); + enqueue(&scheduler.task_queue[prio], task); + pthread_mutex_unlock(&scheduler.scheduler_lock); } void antd_execute_task(antd_task_item_t taski) { + if(!taski) return; // execute the task void *ret = (*(taski->task->handle))(taski->task->data); // check the return data if it is a new task @@ -289,22 +303,69 @@ void antd_execute_task(antd_task_item_t taski) } } } -int antd_scheduler_busy() +int antd_has_pending_task() { int ret = 0; - if(available_workers() != scheduler.n_workers) return 1; - - pthread_mutex_lock(&scheduler.server_lock); + pthread_mutex_lock(&scheduler.scheduler_lock); for(int i = 0; i < N_PRIORITY; i++) if(scheduler.task_queue[i] != NULL) { ret = 1; break; } - pthread_mutex_unlock(&scheduler.server_lock); + pthread_mutex_unlock(&scheduler.scheduler_lock); + if(!ret) + { + pthread_mutex_lock(&scheduler.queue_lock); + ret = (scheduler.workers_queue != NULL); + pthread_mutex_unlock(&scheduler.queue_lock); + } + return ret; } +int antd_scheduler_busy() +{ + + if(antd_available_workers() != scheduler.n_workers) return 1; + //return 0; + return antd_has_pending_task(); +} int antd_scheduler_status() { return scheduler.status; +} +void antd_task_schedule() +{ + // fetch next task from the task_queue + antd_task_item_t it = NULL; + pthread_mutex_lock(&scheduler.scheduler_lock); + for(int i = 0; i< N_PRIORITY; i++) + { + + it = dequeue(&scheduler.task_queue[i]); + if(it) + break; + } + pthread_mutex_unlock(&scheduler.scheduler_lock); + if(!it) + { + return; + } + // has the task now + // check the type of tas + if(it->task->type == LIGHT || scheduler.n_workers <= 0) + { + // do it by myself + antd_execute_task(it); + } + else + { + // delegate to other workers by + //pushing to the worker queue + LOG("delegate to workers\n"); + pthread_mutex_lock(&scheduler.queue_lock); + enqueue(&scheduler.workers_queue, it->task); + free(it); + pthread_mutex_unlock(&scheduler.queue_lock); + } } \ No newline at end of file diff --git a/libs/scheduler.h b/libs/scheduler.h index f304776..de55a49 100644 --- a/libs/scheduler.h +++ b/libs/scheduler.h @@ -9,6 +9,7 @@ #define LOW_PRIORITY (N_PRIORITY - 1) #define HIGH_PRIORITY 0 +typedef enum { LIGHT, HEAVY } antd_task_type_t; // callback definition typedef struct __callback_t{ void* (*handle)(void*); @@ -37,8 +38,14 @@ typedef struct { /* user data if any */ - void * data; - + void * data; + /* + type of a task + light task is executed directly by + the leader + heavy tasks is delegated to workers + */ + antd_task_type_t type; } antd_task_t; typedef struct { @@ -55,9 +62,12 @@ typedef struct __task_item_t{ typedef antd_task_item_t antd_task_queue_t; typedef struct { - pthread_mutex_t server_lock; + pthread_mutex_t queue_lock; + pthread_mutex_t scheduler_lock; + pthread_mutex_t worker_lock; pthread_mutex_t task_lock; antd_task_queue_t task_queue[N_PRIORITY]; + antd_task_queue_t workers_queue; uint8_t status; // 0 stop, 1 working antd_worker_t* workers; int n_workers; @@ -95,4 +105,7 @@ void antd_execute_task(antd_task_item_t); int antd_scheduler_busy(); void antd_attach_task(antd_worker_t* worker); +void antd_task_schedule(); +int antd_available_workers(); +int antd_has_pending_task(); #endif \ No newline at end of file diff --git a/relay.c b/relay.c index d76648c..4274dcd 100644 --- a/relay.c +++ b/relay.c @@ -90,8 +90,6 @@ int main(int argc, char* argv[]) timeout.tv_usec = 500; // 0 worker antd_scheduler_init(0); - antd_worker_t worker; - worker.status = 0; // set server socket to non blocking fcntl(server_sock, F_SETFL, O_NONBLOCK); /* Change the socket into non-blocking state */ LOG("relayd running on port %d\n", port); @@ -99,7 +97,7 @@ int main(int argc, char* argv[]) while (antd_scheduler_status()) { // execute task - antd_attach_task(&worker); + antd_task_schedule(); client_sock = accept(server_sock,(struct sockaddr *)&client_name,&client_name_len); if (client_sock == -1) { @@ -108,11 +106,11 @@ int main(int argc, char* argv[]) antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); // set timeout to socket - if (setsockopt (client_sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,sizeof(timeout)) < 0) - perror("setsockopt failed\n"); + //if (setsockopt (client_sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,sizeof(timeout)) < 0) + // perror("setsockopt failed\n"); - if (setsockopt (client_sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout,sizeof(timeout)) < 0) - perror("setsockopt failed\n"); + //if (setsockopt (client_sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout,sizeof(timeout)) < 0) + // perror("setsockopt failed\n"); /* get the remote IP From 428c6ffb8934ebe46e60173673e6d31b950cecd3 Mon Sep 17 00:00:00 2001 From: Xuan Sang LE Date: Thu, 27 Sep 2018 17:18:31 +0200 Subject: [PATCH 05/15] fix race bug --- libs/scheduler.c | 69 ++++++++++++++++++++++++------------------------ libs/scheduler.h | 2 -- relay.c | 6 ++--- 3 files changed, 38 insertions(+), 39 deletions(-) diff --git a/libs/scheduler.c b/libs/scheduler.c index 8f7dc41..3db1213 100644 --- a/libs/scheduler.c +++ b/libs/scheduler.c @@ -146,19 +146,36 @@ static void destroy_queue(antd_task_queue_t q) free(curr); } } +static int antd_has_pending_task() +{ + int ret = 0; + + for(int i = 0; i < N_PRIORITY; i++) + if(scheduler.task_queue[i] != NULL) + { + ret = 1; + break; + } + if(!ret) + { + ret = (scheduler.workers_queue != NULL); + } + + return ret; +} +static int antd_available_workers() +{ + int n = 0; + //pthread_mutex_lock(&scheduler.worker_lock); + for(int i=0; i < scheduler.n_workers; i++) + if(scheduler.workers[i].status == 0) n++; + //pthread_mutex_unlock(&scheduler.worker_lock); + return n; +} /* Main API methods init the main scheduler */ -int antd_available_workers() -{ - int n = 0; - pthread_mutex_lock(&scheduler.worker_lock); - for(int i=0; i < scheduler.n_workers; i++) - if(scheduler.workers[i].status == 0) n++; - pthread_mutex_unlock(&scheduler.worker_lock); - return n; -} /* * assign task to a worker */ @@ -303,32 +320,17 @@ void antd_execute_task(antd_task_item_t taski) } } } -int antd_has_pending_task() -{ - int ret = 0; - pthread_mutex_lock(&scheduler.scheduler_lock); - for(int i = 0; i < N_PRIORITY; i++) - if(scheduler.task_queue[i] != NULL) - { - ret = 1; - break; - } - pthread_mutex_unlock(&scheduler.scheduler_lock); - if(!ret) - { - pthread_mutex_lock(&scheduler.queue_lock); - ret = (scheduler.workers_queue != NULL); - pthread_mutex_unlock(&scheduler.queue_lock); - } - - return ret; -} + int antd_scheduler_busy() { - - if(antd_available_workers() != scheduler.n_workers) return 1; - //return 0; - return antd_has_pending_task(); + pthread_mutex_lock(&scheduler.worker_lock); + pthread_mutex_lock(&scheduler.scheduler_lock); + pthread_mutex_lock(&scheduler.queue_lock); + int ret = (antd_available_workers() != scheduler.n_workers) || antd_has_pending_task(); + pthread_mutex_unlock(&scheduler.queue_lock); + pthread_mutex_unlock(&scheduler.scheduler_lock); + pthread_mutex_unlock(&scheduler.worker_lock); + return ret; } int antd_scheduler_status() { @@ -362,7 +364,6 @@ void antd_task_schedule() { // delegate to other workers by //pushing to the worker queue - LOG("delegate to workers\n"); pthread_mutex_lock(&scheduler.queue_lock); enqueue(&scheduler.workers_queue, it->task); free(it); diff --git a/libs/scheduler.h b/libs/scheduler.h index de55a49..e025fa7 100644 --- a/libs/scheduler.h +++ b/libs/scheduler.h @@ -106,6 +106,4 @@ void antd_execute_task(antd_task_item_t); int antd_scheduler_busy(); void antd_attach_task(antd_worker_t* worker); void antd_task_schedule(); -int antd_available_workers(); -int antd_has_pending_task(); #endif \ No newline at end of file diff --git a/relay.c b/relay.c index 4274dcd..48b64e4 100644 --- a/relay.c +++ b/relay.c @@ -85,9 +85,9 @@ int main(int argc, char* argv[]) signal(SIGABRT, SIG_IGN); signal(SIGINT, stop_serve); server_sock = startup(&port); - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = 500; + //struct timeval timeout; + //timeout.tv_sec = 0; + //timeout.tv_usec = 500; // 0 worker antd_scheduler_init(0); // set server socket to non blocking From 60a2298e62af40886e93efb6075aeacd9fa7a72b Mon Sep 17 00:00:00 2001 From: lxsang Date: Mon, 1 Oct 2018 22:49:20 +0200 Subject: [PATCH 06/15] fix sync --- libs/scheduler.c | 216 +++++++++++++++-------------------------------- libs/scheduler.h | 53 +++++------- relay.c | 11 +-- 3 files changed, 93 insertions(+), 187 deletions(-) diff --git a/libs/scheduler.c b/libs/scheduler.c index 3db1213..343fed7 100644 --- a/libs/scheduler.c +++ b/libs/scheduler.c @@ -1,20 +1,10 @@ #include "scheduler.h" -/* -private data & methods -*/ -static antd_scheduler_t scheduler; static void enqueue(antd_task_queue_t* q, antd_task_t* task) { antd_task_item_t it = *q; - while(it && it->task->id != task->id && it->next != NULL) + while(it && it->next != NULL) it = it->next; - if(it && it->task->id == task->id) - { - LOG("Task %d exists, ignore it\n", task->id); - //assert(it->task->id == task->id ); - return; - } antd_task_item_t taski = (antd_task_item_t)malloc(sizeof *taski); taski->task = task; taski->next = NULL; @@ -28,28 +18,17 @@ static void enqueue(antd_task_queue_t* q, antd_task_t* task) } } -static int working() -{ - int stat; - pthread_mutex_lock(&scheduler.scheduler_lock); - stat = scheduler.status; - pthread_mutex_unlock(&scheduler.scheduler_lock); - return stat; -} -static void stop() +static void stop(antd_scheduler_t* scheduler) { - pthread_mutex_lock(&scheduler.scheduler_lock); - scheduler.status = 0; - pthread_mutex_unlock(&scheduler.scheduler_lock); - for (int i = 0; i < scheduler.n_workers; i++) - pthread_join(scheduler.workers[i].pid, NULL); - if(scheduler.workers) free(scheduler.workers); + scheduler->status = 0; + for (int i = 0; i < scheduler->n_workers; i++) + pthread_join(scheduler->workers[i], NULL); + if(scheduler->workers) free(scheduler->workers); // destroy all the mutex - pthread_mutex_destroy(&scheduler.scheduler_lock); - pthread_mutex_destroy(&scheduler.task_lock); - pthread_mutex_destroy(&scheduler.queue_lock); - pthread_mutex_destroy(&scheduler.worker_lock); + pthread_mutex_destroy(&scheduler->scheduler_lock); + pthread_mutex_destroy(&scheduler->worker_lock); + pthread_mutex_destroy(&scheduler->pending_lock); } static antd_task_item_t dequeue(antd_task_queue_t* q) @@ -63,15 +42,6 @@ static antd_task_item_t dequeue(antd_task_queue_t* q) return it; } -static antd_task_item_t next_task() -{ - antd_task_item_t it = NULL; - pthread_mutex_lock(&scheduler.queue_lock); - it = dequeue(&scheduler.workers_queue); - pthread_mutex_unlock(&scheduler.queue_lock); - return it; -} - static antd_callback_t* callback_of( void* (*callback)(void*) ) { @@ -106,7 +76,7 @@ static void enqueue_callback(antd_callback_t* cb, antd_callback_t* el) it->next = el; } -static void execute_callback(antd_task_t* task) +static void execute_callback(antd_scheduler_t* scheduler, antd_task_t* task) { antd_callback_t* cb = task->callback; if(cb) @@ -115,21 +85,13 @@ static void execute_callback(antd_task_t* task) task->handle = cb->handle; task->callback = task->callback->next; free(cb); - antd_add_task(task); + antd_add_task(scheduler, task); } else { free(task); } } -static void work(void* data) -{ - antd_worker_t* worker = (antd_worker_t*)data; - while(working()) - { - antd_attach_task(worker); - } -} static void destroy_queue(antd_task_queue_t q) { @@ -146,108 +108,68 @@ static void destroy_queue(antd_task_queue_t q) free(curr); } } -static int antd_has_pending_task() +static void work(antd_scheduler_t* scheduler) { - int ret = 0; - - for(int i = 0; i < N_PRIORITY; i++) - if(scheduler.task_queue[i] != NULL) - { - ret = 1; - break; - } - if(!ret) + while(scheduler->status) { - ret = (scheduler.workers_queue != NULL); + antd_task_item_t it; + pthread_mutex_lock(&scheduler->worker_lock); + it = dequeue(&scheduler->workers_queue); + pthread_mutex_unlock(&scheduler->worker_lock); + // execute the task + //LOG("task executed by worker %d\n", worker->pid); + antd_execute_task(scheduler, it); } - - return ret; -} -static int antd_available_workers() -{ - int n = 0; - //pthread_mutex_lock(&scheduler.worker_lock); - for(int i=0; i < scheduler.n_workers; i++) - if(scheduler.workers[i].status == 0) n++; - //pthread_mutex_unlock(&scheduler.worker_lock); - return n; } + /* Main API methods init the main scheduler */ -/* -* assign task to a worker -*/ -void antd_attach_task(antd_worker_t* worker) -{ - antd_task_item_t it; - pthread_mutex_lock(&scheduler.worker_lock); - it = next_task(); - worker->status = 0; - if(it) - worker->status = 1; - pthread_mutex_unlock(&scheduler.worker_lock); - // execute the task - //LOG("task executed by worker %d\n", worker->pid); - antd_execute_task(it); -} -void antd_scheduler_init(int n) +void antd_scheduler_init(antd_scheduler_t* scheduler, int n) { - time_t t; - srand((unsigned) time(&t)); - scheduler.n_workers = n; - scheduler.status = 1; - scheduler.workers_queue = NULL; + scheduler->n_workers = n; + scheduler->status = 1; + scheduler->workers_queue = NULL; + scheduler->pending_task = 0 ; // init lock - pthread_mutex_init(&scheduler.scheduler_lock,NULL); - pthread_mutex_init(&scheduler.task_lock,NULL); - pthread_mutex_init(&scheduler.worker_lock,NULL); - pthread_mutex_init(&scheduler.queue_lock,NULL); - for(int i = 0; i < N_PRIORITY; i++) scheduler.task_queue[i] = NULL; + pthread_mutex_init(&scheduler->scheduler_lock,NULL); + pthread_mutex_init(&scheduler->worker_lock, NULL); + pthread_mutex_init(&scheduler->pending_lock, NULL); + for(int i = 0; i < N_PRIORITY; i++) scheduler->task_queue[i] = NULL; // create scheduler.workers if(n > 0) { - scheduler.workers = (antd_worker_t*)malloc(n*(sizeof(antd_worker_t))); - if(!scheduler.workers) + scheduler->workers = (pthread_t*)malloc(n*(sizeof(pthread_t))); + if(!scheduler->workers) { LOG("Cannot allocate memory for worker\n"); exit(-1); } - for(int i = 0; i < scheduler.n_workers;i++) + for(int i = 0; i < scheduler->n_workers;i++) { - scheduler.workers[i].status = 0; - if (pthread_create(&scheduler.workers[i].pid , NULL,(void *(*)(void *))work, (void*)&scheduler.workers[i]) != 0) + if (pthread_create(&scheduler->workers[i], NULL,(void *(*)(void *))work, (void*)scheduler) != 0) { - scheduler.workers[i].status = -1; perror("pthread_create: cannot create worker\n"); } } } - LOG("Antd scheduler initialized with %d worker\n", scheduler.n_workers); -} -void antd_task_lock() -{ - pthread_mutex_lock(&scheduler.task_lock); -} -void antd_task_unlock() -{ - pthread_mutex_unlock(&scheduler.task_lock); + LOG("Antd scheduler initialized with %d worker\n", scheduler->n_workers); } /* destroy all pending task pthread_mutex_lock(&scheduler.queue_lock); */ -void antd_scheduler_destroy() +void antd_scheduler_destroy(antd_scheduler_t* scheduler) { // free all the chains - stop(); + stop(scheduler); for(int i=0; i < N_PRIORITY; i++) { - destroy_queue(scheduler.task_queue[i]); + destroy_queue(scheduler->task_queue[i]); } - destroy_queue(scheduler.workers_queue); + destroy_queue(scheduler->workers_queue); } /* @@ -257,7 +179,6 @@ antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callba { antd_task_t* task = (antd_task_t*)malloc(sizeof *task); task->stamp = (unsigned long)time(NULL); - task->id = rand(); task->data = data; task->handle = handle; task->callback = callback_of(callback); @@ -269,26 +190,30 @@ antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callba /* scheduling a task */ -void antd_add_task(antd_task_t* task) +void antd_add_task(antd_scheduler_t* scheduler, antd_task_t* task) { // check if task is exist int prio = task->priority>N_PRIORITY-1?N_PRIORITY-1:task->priority; - pthread_mutex_lock(&scheduler.scheduler_lock); - enqueue(&scheduler.task_queue[prio], task); - pthread_mutex_unlock(&scheduler.scheduler_lock); + pthread_mutex_lock(&scheduler->scheduler_lock); + enqueue(&scheduler->task_queue[prio], task); + pthread_mutex_unlock(&scheduler->scheduler_lock); + pthread_mutex_lock(&scheduler->pending_lock); + scheduler->pending_task++; + pthread_mutex_unlock(&scheduler->pending_lock); } -void antd_execute_task(antd_task_item_t taski) +void antd_execute_task(antd_scheduler_t* scheduler, antd_task_item_t taski) { - if(!taski) return; + if(!taski) + return; // execute the task void *ret = (*(taski->task->handle))(taski->task->data); // check the return data if it is a new task if(!ret) { // call the first callback - execute_callback(taski->task); + execute_callback(scheduler, taski->task); free(taski); } else @@ -308,65 +233,58 @@ void antd_execute_task(antd_task_item_t taski) if(!rtask->handle) { // call the first callback - execute_callback(rtask); + execute_callback(scheduler, rtask); free(taski->task); free(taski); } else { - antd_add_task(rtask); + antd_add_task(scheduler, rtask); free(taski->task); free(taski); } } + pthread_mutex_lock(&scheduler->pending_lock); + scheduler->pending_task--; + pthread_mutex_unlock(&scheduler->pending_lock); } -int antd_scheduler_busy() +int antd_scheduler_busy(antd_scheduler_t* scheduler) { - pthread_mutex_lock(&scheduler.worker_lock); - pthread_mutex_lock(&scheduler.scheduler_lock); - pthread_mutex_lock(&scheduler.queue_lock); - int ret = (antd_available_workers() != scheduler.n_workers) || antd_has_pending_task(); - pthread_mutex_unlock(&scheduler.queue_lock); - pthread_mutex_unlock(&scheduler.scheduler_lock); - pthread_mutex_unlock(&scheduler.worker_lock); - return ret; + return scheduler->pending_task != 0; } -int antd_scheduler_status() -{ - return scheduler.status; -} -void antd_task_schedule() + +void antd_task_schedule(antd_scheduler_t* scheduler) { // fetch next task from the task_queue antd_task_item_t it = NULL; - pthread_mutex_lock(&scheduler.scheduler_lock); + pthread_mutex_lock(&scheduler->scheduler_lock); for(int i = 0; i< N_PRIORITY; i++) { - it = dequeue(&scheduler.task_queue[i]); + it = dequeue(&scheduler->task_queue[i]); if(it) break; } - pthread_mutex_unlock(&scheduler.scheduler_lock); + pthread_mutex_unlock(&scheduler->scheduler_lock); if(!it) { return; } // has the task now // check the type of tas - if(it->task->type == LIGHT || scheduler.n_workers <= 0) + if(it->task->type == LIGHT || scheduler->n_workers <= 0) { // do it by myself - antd_execute_task(it); + antd_execute_task( scheduler, it); } else { // delegate to other workers by //pushing to the worker queue - pthread_mutex_lock(&scheduler.queue_lock); - enqueue(&scheduler.workers_queue, it->task); + pthread_mutex_lock(&scheduler->worker_lock); + enqueue(&scheduler->workers_queue, it->task); + pthread_mutex_unlock(&scheduler->worker_lock); free(it); - pthread_mutex_unlock(&scheduler.queue_lock); } } \ No newline at end of file diff --git a/libs/scheduler.h b/libs/scheduler.h index e025fa7..4c9747d 100644 --- a/libs/scheduler.h +++ b/libs/scheduler.h @@ -3,7 +3,7 @@ #include "utils.h" #include -// thread pool of workers + #define N_PRIORITY 10 #define NORMAL_PRIORITY ((int)((N_PRIORITY - 1) / 2)) #define LOW_PRIORITY (N_PRIORITY - 1) @@ -22,11 +22,7 @@ typedef struct { */ unsigned long stamp; /* - unique id - */ - int id; - /* - priority from 0 to 9 + priority from 0 to N_PRIORITY - 1 higher value is lower priority */ uint8_t priority; @@ -41,18 +37,12 @@ typedef struct { void * data; /* type of a task - light task is executed directly by - the leader - heavy tasks is delegated to workers + light tasks are executed directly + heavy tasks are delegated to workers */ antd_task_type_t type; } antd_task_t; -typedef struct { - pthread_t pid; - uint8_t status; // -1 quit, 0 available, 1 busy -} antd_worker_t; - typedef struct __task_item_t{ antd_task_t* task; @@ -62,25 +52,25 @@ typedef struct __task_item_t{ typedef antd_task_item_t antd_task_queue_t; typedef struct { - pthread_mutex_t queue_lock; pthread_mutex_t scheduler_lock; pthread_mutex_t worker_lock; - pthread_mutex_t task_lock; + pthread_mutex_t pending_lock; antd_task_queue_t task_queue[N_PRIORITY]; antd_task_queue_t workers_queue; uint8_t status; // 0 stop, 1 working - antd_worker_t* workers; + pthread_t* workers; int n_workers; + int pending_task; } antd_scheduler_t; /* init the main scheduler */ -void antd_scheduler_init(); +void antd_scheduler_init(antd_scheduler_t*, int); /* destroy all pending task */ -void antd_scheduler_destroy(); +void antd_scheduler_destroy(antd_scheduler_t*); /* create a task @@ -88,22 +78,19 @@ void antd_scheduler_destroy(); antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callback)(void*)); /* - scheduling a task + add a task */ -void antd_add_task(antd_task_t*); - -void antd_task_lock(); -void antd_task_unlock(); -/* - Execute a task -*/ -int antd_scheduler_status(); +void antd_add_task(antd_scheduler_t*, antd_task_t*); /* execute and free a task a task */ -void antd_execute_task(antd_task_item_t); - -int antd_scheduler_busy(); -void antd_attach_task(antd_worker_t* worker); -void antd_task_schedule(); +void antd_execute_task(antd_scheduler_t*, antd_task_item_t); +/* + scheduler status +*/ +int antd_scheduler_busy(antd_scheduler_t*); +/* + schedule a task +*/ +void antd_task_schedule(antd_scheduler_t*); #endif \ No newline at end of file diff --git a/relay.c b/relay.c index 48b64e4..0fe8327 100644 --- a/relay.c +++ b/relay.c @@ -1,6 +1,7 @@ #include "http_server.h" #include "libs/scheduler.h" #include +static antd_scheduler_t scheduler; /* this node is a relay from the http to https @@ -9,7 +10,7 @@ to https int server_sock = -1; void stop_serve(int dummy) { UNUSED(dummy); - antd_scheduler_destroy(); + antd_scheduler_destroy(&scheduler); close(server_sock); } /* @@ -89,15 +90,15 @@ int main(int argc, char* argv[]) //timeout.tv_sec = 0; //timeout.tv_usec = 500; // 0 worker - antd_scheduler_init(0); + antd_scheduler_init(&scheduler, 0); // set server socket to non blocking fcntl(server_sock, F_SETFL, O_NONBLOCK); /* Change the socket into non-blocking state */ LOG("relayd running on port %d\n", port); - while (antd_scheduler_status()) + while (scheduler.status) { // execute task - antd_task_schedule(); + antd_task_schedule(&scheduler); client_sock = accept(server_sock,(struct sockaddr *)&client_name,&client_name_len); if (client_sock == -1) { @@ -120,7 +121,7 @@ int main(int argc, char* argv[]) client->ip = strdup(inet_ntoa(client_name.sin_addr)); client->sock = client_sock; //accept_request(&client); - antd_add_task(antd_create_task(antd_get_host,(void*)client, antd_free_client )); + antd_add_task(&scheduler, antd_create_task(antd_get_host,(void*)client, antd_free_client )); } return(0); From 3447b07fc6116f2aad3ed989a09570dd9587df10 Mon Sep 17 00:00:00 2001 From: lxsang Date: Wed, 3 Oct 2018 23:42:42 +0200 Subject: [PATCH 07/15] use async for server --- http_server.c | 5 +++++ http_server.h | 1 + httpd.c | 27 ++++++++++++++++----------- libs/handle.h | 5 +++++ 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/http_server.c b/http_server.c index ea34a3d..57cf962 100644 --- a/http_server.c +++ b/http_server.c @@ -162,6 +162,11 @@ end: antd_close(client); } +void* finish_request(void* data) +{ + return NULL; +} + int rule_check(const char*k, const char* v, const char* host, const char* _url, const char* _query, char* buf) { // first perfom rule check on host, if not success, perform on url diff --git a/http_server.h b/http_server.h index 3636fd9..8bc2259 100644 --- a/http_server.h +++ b/http_server.h @@ -24,6 +24,7 @@ extern config_t server_config; void accept_request(void*); +void* finish_request(void*); void cat(void*, FILE *); void cannot_execute(void*); void error_die(const char *); diff --git a/httpd.c b/httpd.c index 7a5bc15..5777ca4 100644 --- a/httpd.c +++ b/httpd.c @@ -2,6 +2,9 @@ #include #include "http_server.h" #include "libs/ini.h" +#include "libs/scheduler.h" +#include +static antd_scheduler_t scheduler; #define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0 int server_sock = -1; @@ -175,10 +178,12 @@ void load_config(const char* file) init_file_system(); } void stop_serve(int dummy) { + UNUSED(dummy); list_free(&(server_config.rules)); freedict(server_config.handlers); LOG("Unclosed connection: %d\n", server_config.connection); unload_all_plugin(); + antd_scheduler_destroy(&scheduler); #ifdef USE_OPENSSL SSL_CTX_free(ctx); #endif @@ -216,21 +221,19 @@ int main(int argc, char* argv[]) server_sock = startup(&port); LOG("httpd running on port %d\n", port); - - while (1) + // default to 4 workers + antd_scheduler_init(&scheduler, 4); + fcntl(server_sock, F_SETFL, O_NONBLOCK); + while (scheduler.status) { - if( server_config.connection >= server_config.maxcon ) - { - LOG("Too many unclosed connection (%d). Wait for it\n", server_config.connection); - continue; - } - antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); + antd_task_schedule(&scheduler); client_sock = accept(server_sock,(struct sockaddr *)&client_name,&client_name_len); if (client_sock == -1) { - perror("Cannot accept client request\n"); + //perror("Cannot accept client request\n"); continue; } + antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); /* get the remote IP */ @@ -273,7 +276,9 @@ int main(int argc, char* argv[]) } } #endif - if (pthread_create(&newthread , NULL,(void *(*)(void *))accept_request, (void *)client) != 0) + // create callback for the server + antd_add_task(&scheduler, antd_create_task(accept_request,(void*)client, finish_request )); + /*if (pthread_create(&newthread , NULL,(void *(*)(void *))accept_request, (void *)client) != 0) { perror("pthread_create"); antd_close(client); @@ -282,7 +287,7 @@ int main(int argc, char* argv[]) { //reclaim the stack data when thread finish pthread_detach(newthread) ; - } + }*/ //accept_request(&client); } diff --git a/libs/handle.h b/libs/handle.h index beea497..234cfeb 100644 --- a/libs/handle.h +++ b/libs/handle.h @@ -54,6 +54,11 @@ typedef struct{ char* ip; } antd_client_t; +typedef struct { + antd_client_t* client; + dictionary request; +} antd_request_t; + int response(void*, const char*); void ctype(void*,const char*); void redirect(void*,const char*); From 5477f54f60121fda2dada833e3e90532a514e64d Mon Sep 17 00:00:00 2001 From: lxsang Date: Thu, 4 Oct 2018 19:47:31 +0200 Subject: [PATCH 08/15] first working but buggy version --- http_server.c | 593 +++++++++++++++++++------------------------------- http_server.h | 16 +- httpd.c | 5 +- libs/handle.c | 37 +++- libs/handle.h | 3 + 5 files changed, 272 insertions(+), 382 deletions(-) diff --git a/http_server.c b/http_server.c index 57cf962..2da4dce 100644 --- a/http_server.c +++ b/http_server.c @@ -1,120 +1,121 @@ #include "http_server.h" static pthread_mutex_t server_mux = PTHREAD_MUTEX_INITIALIZER; -/**********************************************************************/ -/* A request has caused a call to accept() on the server port to -* return. Process the request appropriately. -* Parameters: the socket connected to the client */ -/**********************************************************************/ -void accept_request(void* client) -{ - char buf[1024]; - int numchars; - char method[255]; - char url[4096]; - char path[1024]; - char* token; - char *line; - char* oldurl = NULL; - char* tmp = NULL; - dictionary rq = NULL; - size_t i, j; - struct stat st; - //char *query_string = NULL; - //LOG("SOCK IS %d\n", ((antd_client_t*)client)->sock); - numchars = read_buf(client, buf, sizeof(buf)); - if(numchars <= 0) +void* accept_request(void* client) +{ + int count; + char buf[BUFFLEN]; + antd_request_t* request; + antd_task_t* task; + char* token = NULL; + char* line = NULL; + request = (antd_request_t*)malloc(sizeof(*request)); + request->client = client; + request->request = dict(); + count = read_buf(client, buf, sizeof(buf)); + task = antd_create_task(NULL,(void*)request,NULL); + task->priority++; + if(count <= 0) { unknow(client); - goto end; + return task; } - i = 0; j = 0; - while (j < numchars && !ISspace(buf[j]) && (i < sizeof(method) - 1)) + line = buf; + // get the method string + token = strsep(&line," "); + if(!line) { - method[i] = buf[j]; - i++; j++; + unknow(client); + return task; } - method[i] = '\0'; - if (strcasecmp(method, "GET") && strcasecmp(method, "POST")) + trim(token,' '); + trim(line,' '); + dput(request->request, "METHOD", strdup(token)); + // get the request + token = strsep(&line, " "); + if(!line) { - LOG("METHOD NOT FOUND %s\n", method); - // unimplemented - //while(get_line(client, buf, sizeof(buf)) > 0) printf("%s\n",buf ); - unimplemented(client); - //antd_close(client); - goto end; + unknow(client); + return task; } + trim(token,' '); + trim(line,' '); + trim(line, '\n'); + trim(line, '\r'); + dput(request->request, "PROTOCOL", strdup(line)); + dput(request->request, "REQUEST_QUERY", strdup(token)); + line = token; + token = strsep(&line, "?"); + dput(request->request, "REQUEST_PATH", strdup(token)); + // decode request + // now return the task + task->handle = decode_request_header; + return task; +} - - i = 0; - while (ISspace(buf[j]) && (j < sizeof(buf))) - j++; - while (!ISspace(buf[j]) && (i < sizeof(url) - 1) && (j < sizeof(buf))) - { - url[i] = buf[j]; - i++; j++; - } - url[i] = '\0'; - - - - oldurl = strdup(url); - tmp = strchr(oldurl,'?'); - if(tmp) - *tmp = '\0'; - - rq = decode_request(client, method, url); - if(rq == NULL) - { - badrequest(client); - goto end; - } +void* resolve_request(void* data) +{ + struct stat st; + char path[2*BUFFLEN]; + antd_request_t* rq = (antd_request_t*) data; + antd_task_t* task = antd_create_task(NULL,(void*)rq,NULL); + task->priority++; + char* url = (char*)dvalue(rq->request, "RESOURCE_PATH"); + char* newurl = NULL; + char* rqp = (char*)dvalue(rq->request, "REQUEST_PATH"); sprintf(path, server_config.htdocs); strcat(path, url); LOG("Path is : %s \n", path); //if (path[strlen(path) - 1] == '/') // strcat(path, "index.html"); if (stat(path, &st) == -1) { - if(execute_plugin(client,oldurl,method,rq) < 0) - not_found(client); + //if(execute_plugin(rq->client,rqp,method,rq) < 0) + // not_found(client); + LOG("execute plugin \n"); + free(task); + return execute_plugin(rq, rqp); } else { if (S_ISDIR(st.st_mode)) { - int l = strlen(path); - int ul = strlen(url); strcat(path, "/index.html"); if(stat(path, &st) == -1) { association it; for_each_assoc(it, server_config.handlers) { - path[l] = '\0'; - url[ul] = '\0'; - strcat(url,"/index."); - strcat(path, "/index."); - strcat(url,it->key); - strcat(path, it->key); - if(stat(path, &st) == 0) + newurl = __s("%s/index.%s", url, it->key); + memset(path, 0, sizeof(path)); + strcat(path, server_config.htdocs); + strcat(path, newurl); + if(stat(path, &st) != 0) + { + free(newurl); + newurl = NULL; + } + else { - l = -1; - i = HASHSIZE; break; } } - if(l!= -1) + if(!newurl) { - not_found(client); - goto end; + notfound(rq->client); + return task; } + if(url) free(url); + url = newurl; + dput(rq->request, "RESOURCE_PATH", url); } } + dput(rq->request, "ABS_RESOURCE_PATH", strdup(path)); // check if the mime is supported // if the mime is not supported // find an handler plugin to process it // if the plugin is not found, forbidden access to the file should be sent char* mime_type = mime(path); + dput(rq->request, "RESOURCE_MIME", strdup(mime_type)); if(strcmp(mime_type,"application/octet-stream") == 0) { char * ex = ext(path); @@ -122,48 +123,35 @@ void accept_request(void* client) if(ex) free(ex); if(h) { - sprintf(buf,"/%s%s",h,url); - LOG("WARNING::::Access octetstream via handler %s\n", buf); - if(execute_plugin(client,buf,method,rq) < 0) - cannot_execute(client); + sprintf(path,"/%s%s",h,url); + LOG("WARNING::::Access octetstream via handler %s\n", path); + //if(execute_plugin(client,buf,method,rq) < 0) + // cannot_execute(client); + free(task); + return execute_plugin(rq, path); } else - unknow(client); + unknow(rq->client); } else { - ctype(client,mime_type); - // if the mime is supported, send the file - serve_file(client, path); - //response(client,"this is the file"); - } + task->type = HEAVY; + task->handle = serve_file; + } + return task; } -end: - if(oldurl) free(oldurl); - if(rq) - { - dictionary subdict; - subdict = (dictionary)dvalue(rq, "__xheader__"); - if(subdict) - { - freedict(subdict); - dput(rq, "__xheader__", NULL); - } - - subdict = (dictionary)dvalue(rq, "cookie"); - if(subdict) - { - freedict(subdict); - dput(rq, "cookie", NULL); - } - freedict(rq); - } - antd_close(client); } void* finish_request(void* data) { + if(!data) return NULL; + LOG("Close request\n"); + antd_request_t* rq = (antd_request_t*)data; + // free all other thing + if(rq->request) freedict(rq->request); + antd_close(rq->client); + free(rq); return NULL; } @@ -237,51 +225,6 @@ int rule_check(const char*k, const char* v, const char* host, const char* _url, free(query); return 1; } -/**********************************************************************/ -/* Put the entire contents of a file out on a socket. This function -* is named after the UNIX "cat" command, because it might have been -* easier just to do something like pipe, fork, and exec("cat"). -* Parameters: the client socket descriptor -* FILE pointer for the file to cat */ -/**********************************************************************/ -void catb(void* client, FILE* ptr) -{ - unsigned char buffer[BUFFLEN]; - size_t size; - while(!feof(ptr)) - { - size = fread(buffer,1,BUFFLEN,ptr); - __b(client,buffer,size); - //if(!__b(client,buffer,size)) return; - } - //fclose(ptr); -} -void cat(void* client, FILE *resource) -{ - char buf[1024]; - //fgets(buf, sizeof(buf), resource); - while (fgets(buf, sizeof(buf), resource) != NULL) - { - antd_send(client, buf, strlen(buf)); - //fgets(buf, sizeof(buf), resource); - } - - -} - -/**********************************************************************/ -/* Inform the client that a CGI script could not be executed. -* Parameter: the client socket descriptor. */ -/**********************************************************************/ -void cannot_execute(void* client) -{ - set_status(client,500,"Internal Server Error"); - __t(client,SERVER_STRING); - __t(client,"Content-Type: text/html"); - response(client,""); - __t(client, "

Error prohibited CGI execution."); -} - /**********************************************************************/ /* Print out an error message with perror() (for system errors; based * on value of errno, which indicates system call errors) and exit the @@ -292,99 +235,20 @@ void error_die(const char *sc) perror(sc); exit(1); } - -/**********************************************************************/ -/* Get a line from a socket, whether the line ends in a newline, -* carriage return, or a CRLF combination. Terminates the string read -* with a null character. If no newline indicator is found before the -* end of the buffer, the string is terminated with a null. If any of -* the above three line terminators is read, the last character of the -* string will be a linefeed and the string will be terminated with a -* null character. -* Parameters: the socket descriptor -* the buffer to save the data in -* the size of the buffer -* Returns: the number of bytes stored (excluding null) */ -/**********************************************************************/ -//This function is deprecate -/*int get_line(int sock, char *buf, int size) +void* serve_file(void* data) { - int i = 0; - char c = '\0'; - int n; - - while ((i < size - 1) && (c != '\n')) - { - n = recv(sock, &c, 1, 0); - - if (n > 0) - { - if (c == '\r') - { - n = recv(sock, &c, 1, MSG_PEEK); - - if ((n > 0) && (c == '\n')) - recv(sock, &c, 1, 0); - else - c = '\n'; - } - buf[i] = c; - i++; - } - else - c = '\n'; - } - buf[i] = '\0'; - - return(i); -}*/ - - -/**********************************************************************/ -/* Give a client a 404 not found status message. */ -/**********************************************************************/ -void not_found(void* client) -{ - set_status(client,404,"NOT FOUND"); - __t(client,SERVER_STRING); - __t(client,"Content-Type: text/html"); - response(client,""); - __t(client, "Not Found"); - __t(client, "

The server could not fulfill"); - __t(client, "your request because the resource specified"); - __t(client, "is unavailable or nonexistent."); - __t(client, ""); -} - -/**********************************************************************/ -/* Send a regular file to the client. Use headers, and report -* errors to client if they occur. -* Parameters: a pointer to a file structure produced from the socket -* file descriptor -* the name of the file to serve */ -/**********************************************************************/ -void serve_file(void* client, const char *filename) -{ - LOG("Serve file: %s\n", filename); - FILE *resource = NULL; - int numchars = 1; - //char buf[1024]; - - //buf[0] = 'A'; buf[1] = '\0'; - //while ((numchars > 0) && strcmp("\n", buf)) /* read & discard headers */ - // numchars = get_line(client, buf, sizeof(buf)); - - resource = fopen(filename, "rb"); - if (resource == NULL) - not_found(client); + antd_request_t* rq = (antd_request_t*) data; + antd_task_t* task = antd_create_task(NULL,(void*)rq,NULL); + task->priority++; + char* path = (char*)dvalue(rq->request, "ABS_RESOURCE_PATH"); + char* newurl = NULL; + char* mime_type = (char*)dvalue(rq->request, "RESOURCE_MIME"); + ctype(rq->client,mime_type); + if(is_bin(path)) + __fb(rq->client, path); else - { - if(is_bin(filename)) - catb(client,resource); - else - cat(client, resource); - } - fclose(resource); + __f(rq->client, path); + return task; } /**********************************************************************/ @@ -422,32 +286,6 @@ int startup(unsigned *port) return(httpd); } -/**********************************************************************/ -/* Inform the client that the requested web method has not been -* implemented. -* Parameter: the client socket */ -/**********************************************************************/ -void unimplemented(void* client) -{ - set_status(client,501,"Method Not Implemented"); - __t(client,SERVER_STRING); - __t(client,"Content-Type: text/html"); - response(client,""); - __t(client, "Method Not Implemented"); - __t(client, ""); - __t(client, "

HTTP request method not supported."); - __t(client, ""); -} - -void badrequest(void* client) -{ - set_status(client,400,"Bad Request"); - __t(client,SERVER_STRING); - __t(client,"Content-Type: text/html"); - response(client,""); - __t(client,"The request could not be understood by the server due to malformed syntax."); -} - char* apply_rules(const char* host, char*url) { association it; @@ -491,32 +329,30 @@ char* apply_rules(const char* host, char*url) * - if it is a POST request with URL encoded : decode the url encode * - if it is a POST request with multipart form data: de code the multipart * - if other - UNIMPLEMENTED - * @param client socket client - * @param method HTTP method - * @param query query string in case of GET - * @return a dictionary of key- value + * @param an antd_request_t structure + * @return a task */ -dictionary decode_request(void* client,const char* method, char* url) + +void* decode_request_header(void* data) { - dictionary request = dict(); + antd_request_t* rq = (antd_request_t*) data; dictionary cookie = NULL; - dictionary xheader = dict(); char* line; char * token; char* query = NULL; - char* ctype = NULL; char* host = NULL; - int clen = -1; - + char buf[2*BUFFLEN]; + char* url = (char*)dvalue(rq->request, "REQUEST_QUERY"); + dictionary xheader = dict(); + dictionary request = dict(); + dput(rq->request,"REQUEST_HEADER",xheader); + dput(rq->request,"REQUEST_DATA",request); // first real all header // this for check if web socket is enabled - int ws= 0; - char* ws_key = NULL; - char buf[BUFFLEN]; // ip address - dput(xheader,"REMOTE_ADDR", (void*)strdup(((antd_client_t*)client)->ip )); + dput(xheader,"REMOTE_ADDR", (void*)strdup(((antd_client_t*)rq->client)->ip )); //while((line = read_line(client)) && strcmp("\r\n",line)) - while((read_buf(client,buf,sizeof(buf))) && strcmp("\r\n",buf)) + while((read_buf(rq->client,buf,sizeof(buf))) && strcmp("\r\n",buf)) { line = buf; trim(line, '\n'); @@ -530,109 +366,125 @@ dictionary decode_request(void* client,const char* method, char* url) { if(!cookie) cookie = decode_cookie(line); } - else if(token != NULL &&strcasecmp(token,"Content-Type") == 0) - { - ctype = strdup(line); //strsep(&line,":"); - trim(ctype,' '); - } else if(token != NULL &&strcasecmp(token,"Content-Length") == 0) - { - token = line; //strsep(&line,":"); - trim(token,' '); - clen = atoi(token); - } - else if(token != NULL && strcasecmp(token,"Upgrade") == 0) - { - // verify that the connection is upgrade to websocket - trim(line, ' '); - if(line != NULL && strcasecmp(line,"websocket") == 0) - ws = 1; - }else if(token != NULL && strcasecmp(token,"Host") == 0) + else if(token != NULL && strcasecmp(token,"Host") == 0) { host = strdup(line); } - else if(token != NULL && strcasecmp(token,"Sec-WebSocket-Key") == 0) - { - // get the key from the client - trim(line, ' '); - ws_key = strdup(line); - } } //if(line) free(line); - query = apply_rules(host, url); + memset(buf, 0, sizeof(buf)); + strcat(buf,url); + query = apply_rules(host, buf); + LOG("BUGFGGGG is %s\n", buf); + dput(rq->request,"RESOURCE_PATH",strdup(buf)); if(query) { LOG("Query: %s\n", query); decode_url_request(query, request); free(query); } + if(cookie) + dput(request,"cookie",cookie); if(host) free(host); - if(strcmp(method,"GET") == 0) + // header ok, now checkmethod + antd_task_t* task = antd_create_task(decode_request,(void*)rq, NULL); + task->priority++; + return task; +} + +void* decode_request(void* data) +{ + antd_request_t* rq = (antd_request_t*) data; + dictionary request = dvalue(rq->request, "REQUEST_DATA"); + dictionary headers = dvalue(rq->request, "REQUEST_HEADER"); + int ws = 0; + char*ws_key = NULL; + char* method = NULL; + char* tmp; + antd_task_t* task = NULL; + ws_key = (char*) dvalue(headers, "Sec-WebSocket-Key"); + tmp = (char*)dvalue(headers, "Upgrade"); + if(tmp && strcasecmp(tmp, "websocket") == 0) ws = 1; + method = (char*) dvalue(rq->request, "METHOD"); + task = antd_create_task(NULL,(void*)rq, NULL); + task->priority++; + if(strcmp(method,"GET") == 0 || strcmp(method,"PUT") == 0) { //if(ctype) free(ctype); if(ws && ws_key != NULL) { - ws_confirm_request(client, ws_key); + ws_confirm_request(rq->client, ws_key); free(ws_key); // insert wsocket flag to request // plugin should handle this ugraded connection // not the server - //if(!request) request = dict(); dput(request,"__web_socket__",strdup("1")); } + // resolve task + task->handle = resolve_request; + return task; + } + else if(strcmp(method,"POST") == 0) + { + task->handle = decode_post_request; + task->type = HEAVY; + return task; } else { - if(ws_key) - free(ws_key); - if(ctype == NULL || clen == -1) - { - LOG("Bad request\n"); - if(ctype) free(ctype); - if(cookie) freedict(cookie); - freedict(request); - freedict(xheader); - return NULL; - } - LOG("ContentType %s\n", ctype); - // decide what to do with the data - if(strstr(ctype,FORM_URL_ENCODE) > 0) - { - char* pquery = post_data_decode(client,clen); - decode_url_request(pquery, request); - free(pquery); - } else if(strstr(ctype,FORM_MULTI_PART)> 0) - { - //printf("Multi part form : %s\n", ctype); - decode_multi_part_request(client,ctype,request); - } - else - { - char* pquery = post_data_decode(client,clen); - char* key = strstr(ctype,"/"); - if(key) - key++; - else - key = ctype; - dput(request,key, strdup(pquery)); - free(pquery); - } + unimplemented(rq->client); + return task; } - if(ctype) free(ctype); - //if(cookie->key == NULL) {free(cookie);cookie= NULL;} - //if(!request) - // request = dict(); - if(cookie) - dput(request,"cookie",cookie); - dput(request,"__xheader__",xheader); - return request; } -void __px(const char* data,int size) + +void* decode_post_request(void* data) { - for (int i = 0; i < size; ++i) - printf(" %02x", data[i]); - - printf("\n"); + antd_request_t* rq = (antd_request_t*) data; + dictionary request = dvalue(rq->request, "REQUEST_DATA"); + dictionary headers = dvalue(rq->request, "REQUEST_HEADER"); + char* ctype = NULL; + int clen = -1; + char* tmp; + antd_task_t* task = NULL; + ctype = (char*) dvalue(headers, "Content-Type"); + tmp = (char*)dvalue(headers, "Content-Length"); + if(tmp) + clen = atoi(tmp); + task = antd_create_task(NULL,(void*)rq, NULL); + if(ctype == NULL || clen == -1) + { + LOG("Bad request\n"); + badrequest(rq->client); + return task; + } + LOG("ContentType %s\n", ctype); + // decide what to do with the data + if(strstr(ctype,FORM_URL_ENCODE) > 0) + { + char* pquery = post_data_decode(rq->client,clen); + decode_url_request(pquery, request); + free(pquery); + } else if(strstr(ctype,FORM_MULTI_PART)> 0) + { + //printf("Multi part form : %s\n", ctype); + // TODO: split this to multiple task + decode_multi_part_request(rq->client,ctype,request); + } + else + { + char* pquery = post_data_decode(rq->client,clen); + char* key = strstr(ctype,"/"); + if(key) + key++; + else + key = ctype; + dput(request,key, strdup(pquery)); + free(pquery); + } + task->handle = resolve_request; + return task; } + /** * Send header to the client to confirm * that the websocket is accepted by @@ -944,11 +796,11 @@ char* post_data_decode(void* client,int len) * @return -1 if failure * 1 if sucess */ -int execute_plugin(void* client, const char *path, const char *method, dictionary dic) +void* execute_plugin(void* data, const char *path) { char pname[255]; char pfunc[255]; - void (*fn)(void*, const char*,const char*, dictionary); + void* (*fn)(void*); struct plugin_entry *plugin ; int plen = strlen(path); char * rpath = (char*) malloc((plen+1)*sizeof(char)); @@ -958,6 +810,9 @@ int execute_plugin(void* client, const char *path, const char *method, dictionar rpath[plen] = '\0'; trim(rpath,'/'); char * delim = strchr(rpath,'/'); + antd_request_t* rq = (antd_request_t*) data; + antd_task_t* task = antd_create_task(NULL, (void*)rq, NULL); + task->priority++; if(delim == NULL) { strcpy(pname,rpath); @@ -973,9 +828,8 @@ int execute_plugin(void* client, const char *path, const char *method, dictionar memcpy(pfunc,rpath+npos+1,fpos); pfunc[fpos-1]='\0'; } - LOG("Client %d\n",((antd_client_t*)client)->sock ); + LOG("Client %d\n",((antd_client_t*)rq->client)->sock ); LOG("Path : '%s'\n", rpath); - LOG("Method:%s\n", method); LOG("Plugin name '%s'\n",pname); LOG("Query path. '%s'\n", pfunc); //LOG("query :%s\n", query_string); @@ -989,22 +843,23 @@ int execute_plugin(void* client, const char *path, const char *method, dictionar if( plugin == NULL) { if(orgs) free(orgs); - return -1; + unknow(rq->client); + return task; } } // load the function - fn = (void (*)(void*, const char *, const char*, dictionary))dlsym(plugin->handle, PLUGIN_HANDLER); + fn = (void* (*)(void*))dlsym(plugin->handle, PLUGIN_HANDLER); if ((error = dlerror()) != NULL) { if(orgs) free(orgs); LOG("Problem when finding %s method from %s : %s \n", PLUGIN_HANDLER, pname,error); - return -1; + unknow(rq->client); + return task; } - //dictionary dic = decode_request(client,method,query_string); - (*fn)(client,method,pfunc,dic); - //free(dic); + task->type = HEAVY; + task->handle = fn; free(orgs); - return 1; + return task; } #ifdef USE_OPENSSL diff --git a/http_server.h b/http_server.h index 8bc2259..60a0031 100644 --- a/http_server.h +++ b/http_server.h @@ -8,6 +8,7 @@ #include #include #include "libs/handle.h" +#include "libs/scheduler.h" #include "plugin_manager.h" #define FORM_URL_ENCODE "application/x-www-form-urlencoded" @@ -15,22 +16,19 @@ #define PLUGIN_HANDLER "handle" #define WS_MAGIC_STRING "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" - -#define ISspace(x) isspace((int)(x)) - #define SERVER_STRING "Server: ant-httpd" #define CONFIG "config.ini" extern config_t server_config; -void accept_request(void*); +void* accept_request(void*); void* finish_request(void*); void cat(void*, FILE *); void cannot_execute(void*); void error_die(const char *); //int get_line(int, char *, int); void not_found(void*); -void serve_file(void*, const char *); +void* serve_file(void*); int startup(unsigned *); void unimplemented(void*); void badrequest(void*); @@ -38,12 +36,14 @@ int rule_check(const char*, const char*, const char* , const char* , const char* void ws_confirm_request(void*, const char*); char* post_url_decode(void* client,int len); void decode_url_request(const char* query, dictionary); -dictionary decode_request(void* client,const char* method, char* url); +void* decode_request_header(void* data); +void* decode_request(void* data); +void* decode_post_request(void* data); +void* resolve_request(void* data); void decode_multi_part_request(void*,const char*, dictionary); dictionary decode_cookie(const char*); char* post_data_decode(void*,int); -int execute_plugin(void* client, const char *path, - const char *method, dictionary rq); +void* execute_plugin(void* data, const char *path); #endif \ No newline at end of file diff --git a/httpd.c b/httpd.c index 5777ca4..a5c2f4e 100644 --- a/httpd.c +++ b/httpd.c @@ -2,7 +2,6 @@ #include #include "http_server.h" #include "libs/ini.h" -#include "libs/scheduler.h" #include static antd_scheduler_t scheduler; @@ -179,11 +178,12 @@ void load_config(const char* file) } void stop_serve(int dummy) { UNUSED(dummy); + LOG("Shuting down server \n"); + antd_scheduler_destroy(&scheduler); list_free(&(server_config.rules)); freedict(server_config.handlers); LOG("Unclosed connection: %d\n", server_config.connection); unload_all_plugin(); - antd_scheduler_destroy(&scheduler); #ifdef USE_OPENSSL SSL_CTX_free(ctx); #endif @@ -200,7 +200,6 @@ int main(int argc, char* argv[]) int client_sock = -1; struct sockaddr_in client_name; socklen_t client_name_len = sizeof(client_name); - pthread_t newthread; char* client_ip = NULL; // ignore the broken PIPE error when writing //or reading to/from a closed socked connection diff --git a/libs/handle.c b/libs/handle.c index 670a0ab..ac61708 100644 --- a/libs/handle.c +++ b/libs/handle.c @@ -300,8 +300,41 @@ void clear_cookie(void* client, dictionary dic) } void unknow(void* client) { - html(client); - __t(client,"404 API not found"); + set_status(client,520,"Unknown Error"); + __t(client,"Content-Type: text/html; charset=utf-8"); + response(client,""); + __t(client,"520 Unknow request"); +} +void notfound(void* client) +{ + set_status(client,404,"Not found"); + __t(client,"Content-Type: text/html; charset=utf-8"); + response(client,""); + __t(client,"Resource not found"); +} +void badrequest(void* client) +{ + set_status(client,400,"Bad request"); + __t(client,"Content-Type: text/html; charset=utf-8"); + response(client,""); + __t(client,"400 Bad request"); +} +void unimplemented(void* client) +{ + set_status(client,501,"Method Not Implemented"); + __t(client,"Content-Type: text/html"); + response(client,""); + __t(client, "Method Not Implemented"); + __t(client, ""); + __t(client, "

HTTP request method not supported."); + __t(client, ""); +} +void cannot_execute(void* client) +{ + set_status(client,500,"Internal Server Error"); + __t(client,"Content-Type: text/html"); + response(client,""); + __t(client, "

Error prohibited CGI execution."); } int ws_enable(dictionary dic) { diff --git a/libs/handle.h b/libs/handle.h index 234cfeb..5aaf303 100644 --- a/libs/handle.h +++ b/libs/handle.h @@ -80,6 +80,9 @@ void set_status(void*,int,const char*); void clear_cookie(void*, dictionary); /*Default function for plugin*/ void unknow(void*); +void badrequest(void* client); +void unimplemented(void* client); +void notfound(void* client); int ws_enable(dictionary); char* read_line(void* sock); int read_buf(void* sock,char* buf,int i); From 89e3d7a3f043dad10e4273418b28c6cf6aa7624e Mon Sep 17 00:00:00 2001 From: lxsang Date: Fri, 5 Oct 2018 19:01:39 +0200 Subject: [PATCH 09/15] fix memory leak on the new non blocking system --- http_server.c | 578 +++++++++++++++++++++++++++++----------------- http_server.h | 14 +- httpd.c | 157 +++---------- libs/dictionary.c | 9 +- libs/dictionary.h | 1 - libs/handle.c | 8 +- libs/handle.h | 26 ++- libs/plugin.c | 10 +- libs/plugin.h | 19 +- libs/scheduler.c | 1 + libs/utils.c | 7 +- plugin_manager.c | 26 +-- plugin_manager.h | 3 +- 13 files changed, 461 insertions(+), 398 deletions(-) diff --git a/http_server.c b/http_server.c index 2da4dce..70cb3d7 100644 --- a/http_server.c +++ b/http_server.c @@ -1,52 +1,202 @@ #include "http_server.h" static pthread_mutex_t server_mux = PTHREAD_MUTEX_INITIALIZER; +config_t server_config; +config_t* config() +{ + return &server_config; +} -void* accept_request(void* client) +void destroy_config() +{ + list_free(&(server_config.rules)); + freedict(server_config.handlers); + if(server_config.plugins_dir) free(server_config.plugins_dir); + if(server_config.plugins_ext) free(server_config.plugins_ext); + if(server_config.db_path) free(server_config.db_path); + if(server_config.htdocs) free(server_config.htdocs); + if(server_config.tmpdir) free(server_config.tmpdir); + + LOG("Unclosed connection: %d\n", server_config.connection); +} + +static int config_handler(void* conf, const char* section, const char* name, + const char* value) +{ + config_t* pconfig = (config_t*)conf; + //char * ppath = NULL; + if (MATCH("SERVER", "port")) { + pconfig->port = atoi(value); + } else if (MATCH("SERVER", "plugins")) { + pconfig->plugins_dir = strdup(value); + } else if (MATCH("SERVER", "plugins_ext")) { + pconfig->plugins_ext = strdup(value); + } else if(MATCH("SERVER", "database")) { + pconfig->db_path = strdup(value); + } else if(MATCH("SERVER", "htdocs")) { + pconfig->htdocs = strdup(value); + } else if(MATCH("SERVER", "tmpdir")) { + pconfig->tmpdir = strdup(value); + } + else if(MATCH("SERVER", "maxcon")) { + pconfig->maxcon = atoi(value); + } + else if(MATCH("SERVER", "backlog")) { + pconfig->backlog = atoi(value); + } + else if(MATCH("SERVER", "workers")) { + pconfig->n_workers = atoi(value); + } +#ifdef USE_OPENSSL + else if(MATCH("SERVER", "ssl.enable")) { + pconfig->usessl = atoi(value); + } + else if(MATCH("SERVER", "ssl.cert")) { + pconfig->sslcert = strdup(value); + } + else if(MATCH("SERVER", "ssl.key")) { + pconfig->sslkey = strdup(value); + } +#endif + else if (strcmp(section, "RULES") == 0) + { + list_put_s(&pconfig->rules, name); + list_put_s(&pconfig->rules, value); + } + else if (strcmp(section, "FILEHANDLER") == 0) + { + dput( pconfig->handlers, name ,strdup(value)); + } + else if(strcmp(section,"AUTOSTART")==0){ + // The server section must be added before the autostart section + // auto start plugin + plugin_load(value); + } else { + return 0; /* unknown section/name, error */ + } + return 1; +} +void init_file_system() +{ + struct stat st; + if (stat(server_config.plugins_dir, &st) == -1) + mkdir(server_config.plugins_dir, 0755); + if (stat(server_config.db_path, &st) == -1) + mkdir(server_config.db_path, 0755); + if (stat(server_config.htdocs, &st) == -1) + mkdir(server_config.htdocs, 0755); + if (stat(server_config.tmpdir, &st) == -1) + mkdir(server_config.tmpdir, 0755); + else + { + removeAll(server_config.tmpdir,0); + } + +} +void load_config(const char* file) +{ + server_config.port = 8888; + server_config.plugins_dir = "plugins/"; + server_config.plugins_ext = ".dylib"; + server_config.db_path = "databases/"; + server_config.htdocs = "htdocs/"; + server_config.tmpdir = "tmp/"; + server_config.n_workers = 4; + server_config.backlog = 100; + server_config.rules = list_init(); + server_config.handlers = dict(); + server_config.maxcon = 1000; + server_config.connection = 0; +#ifdef USE_OPENSSL + server_config.usessl = 0; + server_config.sslcert = "cert.pem"; + server_config.sslkey = "key.pem"; +#endif + if (ini_parse(file, config_handler, &server_config) < 0) { + LOG("Can't load '%s'\n. Used defaut configuration", file); + } + else + { + LOG("Using configuration : %s\n", file); +#ifdef USE_OPENSSL + LOG("SSL enable %d\n", server_config.usessl); + LOG("SSL cert %s\n", server_config.sslcert); + LOG("SSL key %s\n", server_config.sslkey); +#endif + } + init_file_system(); +} +void set_nonblock(int socket) { + int flags; + flags = fcntl(socket,F_GETFL,0); + //assert(flags != -1); + fcntl(socket, F_SETFL, flags | O_NONBLOCK); +} + +void* accept_request(void* data) { int count; char buf[BUFFLEN]; - antd_request_t* request; - antd_task_t* task; char* token = NULL; char* line = NULL; - request = (antd_request_t*)malloc(sizeof(*request)); - request->client = client; - request->request = dict(); - count = read_buf(client, buf, sizeof(buf)); - task = antd_create_task(NULL,(void*)request,NULL); + antd_task_t* task; + antd_request_t* rq = (antd_request_t*) data; + + task = antd_create_task(NULL,(void*)rq,NULL); task->priority++; - if(count <= 0) + server_config.connection++; + fd_set read_flags; + // first verify if the socket is ready + antd_client_t* client = (antd_client_t*) rq->client; + FD_ZERO(&read_flags); + FD_SET(rq->client->sock, &read_flags); + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 500; + // select + int sel = select(client->sock+1, &read_flags, NULL, (fd_set*)0, &timeout); + if(sel == -1) { - unknow(client); + unknow(rq->client); return task; } + if(sel == 0 || !FD_ISSET(client->sock, &read_flags) ) + { + // retry it later + server_config.connection--; + task->handle = accept_request; + return task; + } + count = read_buf(rq->client, buf, sizeof(buf)); + //LOG("count is %d\n", count); line = buf; // get the method string token = strsep(&line," "); if(!line) { - unknow(client); + LOG("No method found\n"); + unknow(rq->client); return task; } trim(token,' '); trim(line,' '); - dput(request->request, "METHOD", strdup(token)); + dput(rq->request, "METHOD", strdup(token)); // get the request token = strsep(&line, " "); if(!line) { - unknow(client); + LOG("No request found\n"); + unknow(rq->client); return task; } trim(token,' '); trim(line,' '); trim(line, '\n'); trim(line, '\r'); - dput(request->request, "PROTOCOL", strdup(line)); - dput(request->request, "REQUEST_QUERY", strdup(token)); + dput(rq->request, "PROTOCOL", strdup(line)); + dput(rq->request, "REQUEST_QUERY", strdup(token)); line = token; token = strsep(&line, "?"); - dput(request->request, "REQUEST_PATH", strdup(token)); + dput(rq->request, "REQUEST_PATH", strdup(token)); // decode request // now return the task task->handle = decode_request_header; @@ -69,9 +219,6 @@ void* resolve_request(void* data) //if (path[strlen(path) - 1] == '/') // strcat(path, "index.html"); if (stat(path, &st) == -1) { - //if(execute_plugin(rq->client,rqp,method,rq) < 0) - // not_found(client); - LOG("execute plugin \n"); free(task); return execute_plugin(rq, rqp); } @@ -149,9 +296,23 @@ void* finish_request(void* data) LOG("Close request\n"); antd_request_t* rq = (antd_request_t*)data; // free all other thing - if(rq->request) freedict(rq->request); + if(rq->request) + { + dictionary tmp = dvalue(rq->request, "COOKIE"); + if(tmp) freedict(tmp); + tmp = dvalue(rq->request, "REQUEST_HEADER"); + if(tmp) freedict(tmp); + tmp = dvalue(rq->request, "REQUEST_DATA"); + if(tmp) freedict(tmp); + dput(rq->request, "REQUEST_HEADER", NULL); + dput(rq->request, "REQUEST_DATA", NULL); + dput(rq->request, "COOKIE", NULL); + freedict(rq->request); + } antd_close(rq->client); free(rq); + server_config.connection--; + LOG("Remaining connection %d\n", server_config.connection); return NULL; } @@ -225,12 +386,8 @@ int rule_check(const char*k, const char* v, const char* host, const char* _url, free(query); return 1; } -/**********************************************************************/ -/* Print out an error message with perror() (for system errors; based -* on value of errno, which indicates system call errors) and exit the -* program indicating an error. */ -/**********************************************************************/ -void error_die(const char *sc) + +static void error_die(const char *sc) { perror(sc); exit(1); @@ -251,14 +408,6 @@ void* serve_file(void* data) return task; } -/**********************************************************************/ -/* This function starts the process of listening for web connections -* on a specified port. If the port is 0, then dynamically allocate a -* port and modify the original port variable to reflect the actual -* port. -* Parameters: pointer to variable containing the port to connect on -* Returns: the socket */ -/**********************************************************************/ int startup(unsigned *port) { int httpd = 0; @@ -322,15 +471,7 @@ char* apply_rules(const char* host, char*url) return strdup(query_string); } /** - * Decode the HTTP request - * Get the cookie values - * if it is the GET request, decode the query string into a dictionary - * if it is a POST, check the content type of the request - * - if it is a POST request with URL encoded : decode the url encode - * - if it is a POST request with multipart form data: de code the multipart - * - if other - UNIMPLEMENTED - * @param an antd_request_t structure - * @return a task + * Decode the HTTP request header */ void* decode_request_header(void* data) @@ -375,7 +516,6 @@ void* decode_request_header(void* data) memset(buf, 0, sizeof(buf)); strcat(buf,url); query = apply_rules(host, buf); - LOG("BUGFGGGG is %s\n", buf); dput(rq->request,"RESOURCE_PATH",strdup(buf)); if(query) { @@ -384,7 +524,7 @@ void* decode_request_header(void* data) free(query); } if(cookie) - dput(request,"cookie",cookie); + dput(rq->request,"COOKIE",cookie); if(host) free(host); // header ok, now checkmethod antd_task_t* task = antd_create_task(decode_request,(void*)rq, NULL); @@ -451,6 +591,7 @@ void* decode_post_request(void* data) if(tmp) clen = atoi(tmp); task = antd_create_task(NULL,(void*)rq, NULL); + task->priority++; if(ctype == NULL || clen == -1) { LOG("Bad request\n"); @@ -468,7 +609,8 @@ void* decode_post_request(void* data) { //printf("Multi part form : %s\n", ctype); // TODO: split this to multiple task - decode_multi_part_request(rq->client,ctype,request); + free(task); + return decode_multi_part_request(rq,ctype,request); } else { @@ -545,193 +687,207 @@ dictionary decode_cookie(const char* line) token1 = strsep(&token,"="); if(token1 && token && strlen(token) > 0) { - if(dic == NULL) - dic = dict(); + if(dic == NULL) dic = dict(); + LOG("%s: %s\n", token1, token); dput(dic,token1,strdup(token)); } } - //} free(orgcpy); return dic; } /** * Decode the multi-part form data from the POST request * If it is a file upload, copy the file to tmp dir - * and generate the metadata for the server-side - * @param client the socket client - * @param ctype Content-Type of the request - * @param clen Content length, but not used here - * @return a dictionary of key - value */ -void decode_multi_part_request(void* client,const char* ctype, dictionary dic) +void* decode_multi_part_request(void* data,const char* ctype, dictionary dic) { char * boundary; - char * boundend; char * line; - char * orgline; char * str_copy = strdup(ctype); char* orgcpy = str_copy; - char* token; - char* keytoken ; - char* valtoken ; - char* part_name; - char* part_file; + antd_request_t* rq = (antd_request_t*) data; + antd_task_t* task = antd_create_task(NULL, (void*)rq, NULL); + task->priority++; + //dictionary dic = NULL; + FILE *fp = NULL; + boundary = strsep(&str_copy,"="); //discard first part + boundary = str_copy; + if(boundary && strlen(boundary)>0) + { + //dic = dict(); + trim(boundary,' '); + dput(rq->request, "MULTI_PART_BOUNDARY", strdup(boundary)); + //find first boundary + while((line = read_line(rq->client))&&strstr(line,boundary) <= 0) + { + if(line) free(line); + } + if(line) + { + task->handle = decode_multi_part_request_data; + task->type = HEAVY; + free(line); + } + } + free(orgcpy); + return task; +} +void* decode_multi_part_request_data(void* data) +{ + // loop through each part separated by the boundary + char* line; + char* orgline; + char* part_name = NULL; + char* part_file = NULL; char* file_path; char buf[BUFFLEN]; char* field; //dictionary dic = NULL; FILE *fp = NULL; - boundary = strsep(&str_copy,"="); //discard first part - boundary = strsep(&str_copy,"="); - if(boundary && strlen(boundary)>0) + char* token, *keytoken, *valtoken; + antd_request_t* rq = (antd_request_t*) data; + antd_task_t* task = antd_create_task(NULL, (void*)rq, NULL); + task->priority++; + char* boundary = (char*)dvalue(rq->request, "MULTI_PART_BOUNDARY"); + dictionary dic = (dictionary)dvalue(rq->request, "REQUEST_DATA"); + char* boundend = __s("%s--",boundary); + // search for content disposition: + while((line = read_line(rq->client)) && + strstr(line,"Content-Disposition:") <= 0) { - //dic = dict(); - trim(boundary,' '); - boundend = __s("%s--",boundary); - //find first boundary - while((line = read_line(client))&&strstr(line,boundary) <= 0) - { - if(line) free(line); - } - // loop through each part separated by the boundary - while(line && strstr(line,boundary) > 0){ - if(line) - { - free(line); - line = NULL; - } - // search for content disposition: - while((line = read_line(client)) && - strstr(line,"Content-Disposition:") <= 0) - { - free(line); - line = NULL; - } - if(!line || strstr(line,"Content-Disposition:") <= 0) - { - if(line) - free(line); - free(orgcpy); - free(boundend); - return; - } - orgline = line; - // extract parameters from header - part_name = NULL; - part_file = NULL; - while((token = strsep(&line,";"))) - { - keytoken = strsep(&token,"="); - if(keytoken && strlen(keytoken)>0) - { - trim(keytoken,' '); - valtoken = strsep(&token,"="); - if(valtoken) - { - trim(valtoken,' '); - trim(valtoken,'\n'); - trim(valtoken,'\r'); - trim(valtoken,'\"'); - if(strcmp(keytoken,"name") == 0) - { - part_name = strdup(valtoken); - } else if(strcmp(keytoken,"filename") == 0) - { - part_file = strdup(valtoken); - } - } - } - } - free(orgline); - line = NULL; - // get the binary data - if(part_name != NULL) - { - // go to the beginer of data bock - while((line = read_line(client)) && strcmp(line,"\r\n") != 0) - { - free(line); - line = NULL; - } - if(line) - { - free(line); - line = NULL; - } - if(part_file == NULL) - { - /** - * This allow only 1024 bytes of data (max), - * out of this range, the data is cut out. - * Need an efficient way to handle this - */ - line = read_line(client); - trim(line,'\n'); - trim(line,'\r'); - trim(line,' '); - dput(dic,part_name,line); - // find the next boundary - while((line = read_line(client)) && strstr(line,boundary) <= 0) - { - free(line); - line = NULL; - } - } - else - { - file_path = __s("%s%s.%u",server_config.tmpdir,part_file,(unsigned)time(NULL)); - fp=fopen(file_path, "wb"); - if(fp) - { - int totalsize=0,len=0; - //read until the next boundary - while((len = read_buf(client,buf,sizeof(buf))) > 0 && strstr(buf,boundary) <= 0) - { - fwrite(buf, len, 1, fp); - totalsize += len; - } - //remove \r\n at the end - fseek(fp,-2, SEEK_CUR); - totalsize -= 2; - fclose(fp); - line = strdup(buf); - - field = __s("%s.file",part_name); - dput(dic,field, strdup(part_file)); - free(field); - field = __s("%s.tmp",part_name); - dput(dic,field,strdup(file_path)); - free(field); - field = __s("%s.size",part_name); - dput(dic,field,__s("%d",totalsize)); - free(field); - field = __s("%s.ext",part_name); - dput(dic,field,ext(part_file)); - free(field); - - } - else - { - LOG("Cannot wirte file to :%s\n", file_path ); - } - free(file_path); - free(part_file); - } - free(part_name); - } - //printf("[Lines]:%s\n",line); - // check if end of request - if(line&&strstr(line,boundend)>0) - { - LOG("End request %s\n", boundend); - free(line); - break; - } - } - free(boundend); + free(line); + line = NULL; } - free(orgcpy); - //return dic; + if(!line || strstr(line,"Content-Disposition:") <= 0) + { + if(line) + free(line); + free(boundend); + return task; + } + orgline = line; + // extract parameters from header + while((token = strsep(&line,";"))) + { + keytoken = strsep(&token,"="); + if(keytoken && strlen(keytoken)>0) + { + trim(keytoken,' '); + valtoken = strsep(&token,"="); + if(valtoken) + { + trim(valtoken,' '); + trim(valtoken,'\n'); + trim(valtoken,'\r'); + trim(valtoken,'\"'); + if(strcmp(keytoken,"name") == 0) + { + part_name = strdup(valtoken); + } else if(strcmp(keytoken,"filename") == 0) + { + part_file = strdup(valtoken); + } + } + } + } + free(orgline); + line = NULL; + // get the binary data + if(part_name != NULL) + { + // go to the beginer of data bock + while((line = read_line(rq->client)) && strcmp(line,"\r\n") != 0) + { + free(line); + line = NULL; + } + if(line) + { + free(line); + line = NULL; + } + if(part_file == NULL) + { + /** + * This allow only 1024 bytes of data (max), + * out of this range, the data is cut out. + * Need an efficient way to handle this + */ + line = read_line(rq->client); + trim(line,'\n'); + trim(line,'\r'); + trim(line,' '); + dput(dic,part_name,line); + // find the next boundary + while((line = read_line(rq->client)) && strstr(line,boundary) <= 0) + { + free(line); + line = NULL; + } + } + else + { + file_path = __s("%s%s.%u",server_config.tmpdir,part_file,(unsigned)time(NULL)); + fp=fopen(file_path, "wb"); + if(fp) + { + int totalsize=0,len=0; + //read until the next boundary + while((len = read_buf(rq->client,buf,sizeof(buf))) > 0 && strstr(buf,boundary) <= 0) + { + fwrite(buf, len, 1, fp); + totalsize += len; + } + //remove \r\n at the end + fseek(fp, 0, SEEK_SET); + //fseek(fp,-2, SEEK_CUR); + totalsize -= 2; + ftruncate(fileno(fp),totalsize); + fclose(fp); + line = strdup(buf); + + field = __s("%s.file",part_name); + dput(dic,field, strdup(part_file)); + free(field); + field = __s("%s.tmp",part_name); + dput(dic,field,strdup(file_path)); + free(field); + field = __s("%s.size",part_name); + dput(dic,field,__s("%d",totalsize)); + free(field); + field = __s("%s.ext",part_name); + dput(dic,field,ext(part_file)); + free(field); + + } + else + { + LOG("Cannot wirte file to :%s\n", file_path ); + } + free(file_path); + free(part_file); + } + free(part_name); + } + //printf("[Lines]:%s\n",line); + // check if end of request + if(line&&strstr(line,boundend)>0) + { + LOG("End request %s\n", boundend); + task->handle = resolve_request; + free(line); + free(boundend); + return task; + } + if(line && strstr(line,boundary) > 0) + { + // continue upload + task->type = HEAVY; + task->handle = decode_multi_part_request_data; + } + free(line); + free(boundend); + return task; } /** * Decode a query string (GET request or POST URL encoded) to diff --git a/http_server.h b/http_server.h index 60a0031..6a965c7 100644 --- a/http_server.h +++ b/http_server.h @@ -7,6 +7,8 @@ #include #include #include +#include +#include #include "libs/handle.h" #include "libs/scheduler.h" #include "plugin_manager.h" @@ -15,17 +17,16 @@ #define FORM_MULTI_PART "multipart/form-data" #define PLUGIN_HANDLER "handle" #define WS_MAGIC_STRING "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" - -#define SERVER_STRING "Server: ant-httpd" +#define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0 #define CONFIG "config.ini" -extern config_t server_config; +config_t* config(); +void destroy_config(); void* accept_request(void*); void* finish_request(void*); void cat(void*, FILE *); void cannot_execute(void*); -void error_die(const char *); //int get_line(int, char *, int); void not_found(void*); void* serve_file(void*); @@ -40,10 +41,11 @@ void* decode_request_header(void* data); void* decode_request(void* data); void* decode_post_request(void* data); void* resolve_request(void* data); -void decode_multi_part_request(void*,const char*, dictionary); +void* decode_multi_part_request(void*,const char*, dictionary); +void* decode_multi_part_request_data(void* data); dictionary decode_cookie(const char*); char* post_data_decode(void*,int); - +void set_nonblock(int); void* execute_plugin(void* data, const char *path); #endif \ No newline at end of file diff --git a/httpd.c b/httpd.c index a5c2f4e..89ba04f 100644 --- a/httpd.c +++ b/httpd.c @@ -2,11 +2,9 @@ #include #include "http_server.h" #include "libs/ini.h" -#include static antd_scheduler_t scheduler; +static int server_sock = -1; -#define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0 -int server_sock = -1; #ifdef USE_OPENSSL static int ssl_session_ctx_id = 1; SSL_CTX *ctx; @@ -55,13 +53,13 @@ void configure_context(SSL_CTX *ctx) SSL_CTX_set_session_id_context(ctx, (void *)&ssl_session_ctx_id, sizeof(ssl_session_ctx_id)); /* Set the key and cert */ /* use the full chain bundle of certificate */ - //if (SSL_CTX_use_certificate_file(ctx, server_config.sslcert, SSL_FILETYPE_PEM) <= 0) { - if (SSL_CTX_use_certificate_chain_file(ctx, server_config.sslcert) <= 0) { + //if (SSL_CTX_use_certificate_file(ctx, server_config->sslcert, SSL_FILETYPE_PEM) <= 0) { + if (SSL_CTX_use_certificate_chain_file(ctx, config()->sslcert) <= 0) { ERR_print_errors_fp(stderr); exit(EXIT_FAILURE); } - if (SSL_CTX_use_PrivateKey_file(ctx, server_config.sslkey, SSL_FILETYPE_PEM) <= 0 ) { + if (SSL_CTX_use_PrivateKey_file(ctx, config()->sslkey, SSL_FILETYPE_PEM) <= 0 ) { ERR_print_errors_fp(stderr); exit(EXIT_FAILURE); } @@ -74,120 +72,23 @@ void configure_context(SSL_CTX *ctx) #endif -static int config_handler(void* conf, const char* section, const char* name, - const char* value) -{ - config_t* pconfig = (config_t*)conf; - //char * ppath = NULL; - if (MATCH("SERVER", "port")) { - pconfig->port = atoi(value); - } else if (MATCH("SERVER", "plugins")) { - pconfig->plugins_dir = strdup(value); - } else if (MATCH("SERVER", "plugins_ext")) { - pconfig->plugins_ext = strdup(value); - } else if(MATCH("SERVER", "database")) { - pconfig->db_path = strdup(value); - } else if(MATCH("SERVER", "htdocs")) { - pconfig->htdocs = strdup(value); - } else if(MATCH("SERVER", "tmpdir")) { - pconfig->tmpdir = strdup(value); - } - else if(MATCH("SERVER", "maxcon")) { - pconfig->maxcon = atoi(value); - } - else if(MATCH("SERVER", "backlog")) { - pconfig->backlog = atoi(value); - } -#ifdef USE_OPENSSL - else if(MATCH("SERVER", "ssl.enable")) { - pconfig->usessl = atoi(value); - } - else if(MATCH("SERVER", "ssl.cert")) { - pconfig->sslcert = strdup(value); - } - else if(MATCH("SERVER", "ssl.key")) { - pconfig->sslkey = strdup(value); - } -#endif - else if (strcmp(section, "RULES") == 0) - { - list_put_s(&pconfig->rules, name); - list_put_s(&pconfig->rules, value); - } - else if (strcmp(section, "FILEHANDLER") == 0) - { - dput( pconfig->handlers, name ,strdup(value)); - } - else if(strcmp(section,"AUTOSTART")==0){ - // The server section must be added before the autostart section - // auto start plugin - plugin_load(value); - } else { - return 0; /* unknown section/name, error */ - } - return 1; -} -void init_file_system() -{ - struct stat st; - if (stat(server_config.plugins_dir, &st) == -1) - mkdir(server_config.plugins_dir, 0755); - if (stat(server_config.db_path, &st) == -1) - mkdir(server_config.db_path, 0755); - if (stat(server_config.htdocs, &st) == -1) - mkdir(server_config.htdocs, 0755); - if (stat(server_config.tmpdir, &st) == -1) - mkdir(server_config.tmpdir, 0755); - else - { - removeAll(server_config.tmpdir,0); - } - -} -void load_config(const char* file) -{ - server_config.port = 8888; - server_config.plugins_dir = "plugins/"; - server_config.plugins_ext = ".dylib"; - server_config.db_path = "databases/"; - server_config.htdocs = "htdocs"; - server_config.tmpdir = "tmp"; - server_config.backlog = 100; - server_config.rules = list_init(); - server_config.handlers = dict(); - server_config.maxcon = 1000; - server_config.connection = 0; -#ifdef USE_OPENSSL - server_config.usessl = 0; - server_config.sslcert = "cert.pem"; - server_config.sslkey = "key.pem"; -#endif - if (ini_parse(file, config_handler, &server_config) < 0) { - LOG("Can't load '%s'\n. Used defaut configuration", file); - } - else - { - LOG("Using configuration : %s\n", file); -#ifdef USE_OPENSSL - LOG("SSL enable %d\n", server_config.usessl); - LOG("SSL cert %s\n", server_config.sslcert); - LOG("SSL key %s\n", server_config.sslkey); -#endif - } - init_file_system(); -} void stop_serve(int dummy) { UNUSED(dummy); - LOG("Shuting down server \n"); + sigset_t mask; + sigemptyset(&mask); + //Blocks the SIG_IGN signal (by adding SIG_IGN to newMask) + sigaddset(&mask, SIGINT); + sigaddset(&mask, SIGPIPE); + sigaddset(&mask, SIGABRT); + sigprocmask(SIG_BLOCK, &mask, NULL); antd_scheduler_destroy(&scheduler); - list_free(&(server_config.rules)); - freedict(server_config.handlers); - LOG("Unclosed connection: %d\n", server_config.connection); - unload_all_plugin(); + unload_all_plugin(); + destroy_config(); #ifdef USE_OPENSSL SSL_CTX_free(ctx); #endif close(server_sock); + sigprocmask(SIG_UNBLOCK, &mask, NULL); } int main(int argc, char* argv[]) { @@ -196,7 +97,7 @@ int main(int argc, char* argv[]) load_config(CONFIG); else load_config(argv[1]); - unsigned port = server_config.port; + unsigned port = config()->port; int client_sock = -1; struct sockaddr_in client_name; socklen_t client_name_len = sizeof(client_name); @@ -208,7 +109,7 @@ int main(int argc, char* argv[]) signal(SIGINT, stop_serve); #ifdef USE_OPENSSL - if( server_config.usessl == 1 ) + if( config()->usessl == 1 ) { init_openssl(); ctx = create_context(); @@ -217,12 +118,11 @@ int main(int argc, char* argv[]) } #endif - server_sock = startup(&port); LOG("httpd running on port %d\n", port); // default to 4 workers - antd_scheduler_init(&scheduler, 4); - fcntl(server_sock, F_SETFL, O_NONBLOCK); + antd_scheduler_init(&scheduler, config()->n_workers); + set_nonblock(server_sock); while (scheduler.status) { antd_task_schedule(&scheduler); @@ -233,6 +133,9 @@ int main(int argc, char* argv[]) continue; } antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); + antd_request_t* request = (antd_request_t*)malloc(sizeof(*request)); + request->client = client; + request->request = dict(); /* get the remote IP */ @@ -242,27 +145,29 @@ int main(int argc, char* argv[]) client_ip = inet_ntoa(client_name.sin_addr); client->ip = strdup(client_ip); LOG("Client IP: %s\n", client_ip); + LOG("socket: %d\n", client_sock); } //return &(((struct sockaddr_in6*)sa)->sin6_addr); /* accept_request(client_sock); */ // set timeout to socket - struct timeval timeout; - timeout.tv_sec = 20; - timeout.tv_usec = 0; + set_nonblock(client_sock); + /*struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 5000; if (setsockopt (client_sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,sizeof(timeout)) < 0) perror("setsockopt failed\n"); if (setsockopt (client_sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout,sizeof(timeout)) < 0) perror("setsockopt failed\n"); - + */ client->sock = client_sock; - server_config.connection++; - //LOG("Unclosed connection: %d\n", server_config.connection); + // 100 times retry connection before abort + //LOG("Unclosed connection: %d\n", server_config->connection); #ifdef USE_OPENSSL client->ssl = NULL; - if(server_config.usessl == 1) + if(config()->usessl == 1) { client->ssl = (void*)SSL_new(ctx); if(!client->ssl) continue; @@ -276,7 +181,7 @@ int main(int argc, char* argv[]) } #endif // create callback for the server - antd_add_task(&scheduler, antd_create_task(accept_request,(void*)client, finish_request )); + antd_add_task(&scheduler, antd_create_task(accept_request,(void*)request, finish_request )); /*if (pthread_create(&newthread , NULL,(void *(*)(void *))accept_request, (void *)client) != 0) { perror("pthread_create"); diff --git a/libs/dictionary.c b/libs/dictionary.c index a77dcb1..89e54c0 100644 --- a/libs/dictionary.c +++ b/libs/dictionary.c @@ -52,18 +52,25 @@ association __put_el_with_key(dictionary dic, const char* key) if(dic == NULL) return NULL; if ((np = dlookup(dic,key)) == NULL) { /* not found */ np = (association) malloc(sizeof(*np)); + np->value = NULL; if (np == NULL || (np->key = strdup(key)) == NULL) return NULL; hashval = hash(key, DHASHSIZE); np->next = dic[hashval]; dic[hashval] = np; } + // found return np; } association dput(dictionary dic,const char* key, void* value) { association np = __put_el_with_key(dic,key); - if(np == NULL) return NULL; + if(np == NULL) + { + if(value) free(value); + return NULL; + } + if(np->value && value) free(np->value); np->value = value; return np; } diff --git a/libs/dictionary.h b/libs/dictionary.h index 1675df4..752001a 100644 --- a/libs/dictionary.h +++ b/libs/dictionary.h @@ -46,6 +46,5 @@ void* dvalue(dictionary, const char*); association dput(dictionary,const char*, void*); int dremove(dictionary, const char*); void freedict(dictionary); -void stest(const char* ); #endif \ No newline at end of file diff --git a/libs/handle.c b/libs/handle.c index ac61708..eea6f16 100644 --- a/libs/handle.c +++ b/libs/handle.c @@ -1,5 +1,4 @@ -#include "handle.h" -config_t server_config; +#include "handle.h" #ifdef USE_OPENSSL int usessl() { @@ -133,10 +132,7 @@ int antd_close(void* src) #endif //printf("Close sock %d\n", source->sock); int ret = close(source->sock); - if(source->ip) free(source->ip); - // TODO remove this when using nonblocking - server_config.connection--; - LOG("Remaining connection %d\n", server_config.connection); + if(source->ip) free(source->ip); free(src); src = NULL; return ret; diff --git a/libs/handle.h b/libs/handle.h index 5aaf303..61da196 100644 --- a/libs/handle.h +++ b/libs/handle.h @@ -15,7 +15,7 @@ #include "list.h" #include "ini.h" -#define SERVER_NAME "antd" +#define SERVER_NAME "Antd" #define IS_POST(method) (strcmp(method,"POST")== 0) #define IS_GET(method) (strcmp(method,"GET")== 0) #define R_STR(d,k) ((char*)dvalue(d,k)) @@ -28,6 +28,18 @@ #ifdef USE_OPENSSL int __attribute__((weak)) usessl(); #endif +//extern config_t server_config; +typedef struct{ + int sock; + void* ssl; + char* ip; +} antd_client_t; + +typedef struct { + antd_client_t* client; + dictionary request; +} antd_request_t; + typedef struct { int port; @@ -41,23 +53,13 @@ typedef struct { int backlog; int maxcon; int connection; + int n_workers; #ifdef USE_OPENSSL int usessl; char* sslcert; char* sslkey; #endif }config_t; -//extern config_t server_config; -typedef struct{ - int sock; - void* ssl; - char* ip; -} antd_client_t; - -typedef struct { - antd_client_t* client; - dictionary request; -} antd_request_t; int response(void*, const char*); void ctype(void*,const char*); diff --git a/libs/plugin.c b/libs/plugin.c index b804861..31799c0 100644 --- a/libs/plugin.c +++ b/libs/plugin.c @@ -1,6 +1,6 @@ #include "plugin.h" -plugin_header __plugin__; +plugin_header_t __plugin__; // private function void __init_plugin__(const char* pl,config_t* conf){ __plugin__.name = strdup(pl); @@ -40,7 +40,10 @@ int usessl() return __plugin__.usessl; } #endif*/ - +plugin_header_t* meta() +{ + return &__plugin__; +} char* route(const char* repath) { int len = strlen(__plugin__.name) + 2; @@ -74,8 +77,9 @@ char* config_dir() return path; } -void __release() +void __release__() { + destroy(); printf("Releasing plugin\n"); if(__plugin__.name) free(__plugin__.name); if(__plugin__.dbpath) free(__plugin__.dbpath); diff --git a/libs/plugin.h b/libs/plugin.h index 8f5ac48..468dad0 100644 --- a/libs/plugin.h +++ b/libs/plugin.h @@ -5,6 +5,7 @@ #include "dbhelper.h" #endif #include "ws.h" +#include "scheduler.h" typedef struct { char *name; @@ -15,7 +16,7 @@ typedef struct { #ifdef USE_OPENSSL int usessl; #endif -} plugin_header; +} plugin_header_t; @@ -23,17 +24,6 @@ typedef struct { #ifdef USE_DB typedef sqlite3* sqldb; #endif -/* -Two server, -Two configuration different -Does it work -Replace this by a accessing function -that execute the set value to -the header, instead of -exporting global variables -*/ -extern plugin_header __plugin__; -//extern call __init__; #ifdef USE_DB @@ -47,6 +37,7 @@ char* config_dir(); /*Default function for plugin*/ // init the plugin void init(); -void handle(void*, const char*,const char*,dictionary); -void __release(); +void destroy(); +void* handle(void*); +plugin_header_t* meta(); #endif diff --git a/libs/scheduler.c b/libs/scheduler.c index 343fed7..0800231 100644 --- a/libs/scheduler.c +++ b/libs/scheduler.c @@ -165,6 +165,7 @@ void antd_scheduler_destroy(antd_scheduler_t* scheduler) { // free all the chains stop(scheduler); + LOG("Destroy remaining queue\n"); for(int i=0; i < N_PRIORITY; i++) { destroy_queue(scheduler->task_queue[i]); diff --git a/libs/utils.c b/libs/utils.c index 7ecef2b..aa34951 100644 --- a/libs/utils.c +++ b/libs/utils.c @@ -145,7 +145,11 @@ char* ext(const char* file) if(file == NULL) return NULL; char* str_cpy = strdup(file); char* str_org = str_cpy; - if(strstr(str_cpy,".")<= 0) return strdup(""); + if(strstr(str_cpy,".")<= 0) + { + free(str_org); + return NULL; + } if(*file == '.') trim(str_cpy,'.'); @@ -184,6 +188,7 @@ mime_t mime_from_type(const char* type) char* mime(const char* file) { char * ex = ext(file); + if(!ex) return "application/octet-stream"; mime_t m = mime_from_ext(ex); if(ex) free(ex); diff --git a/plugin_manager.c b/plugin_manager.c index 1c959b9..a6e2b57 100644 --- a/plugin_manager.c +++ b/plugin_manager.c @@ -31,10 +31,16 @@ struct plugin_entry *plugin_load(char *name) if ((np = plugin_lookup(name)) == NULL) { /* not found */ np = (struct plugin_entry *) malloc(sizeof(*np)); if (np == NULL || (np->pname = strdup(name)) == NULL) - return NULL; + { + if(np) free(np); + return NULL; + } if ((np->handle = plugin_from_file(name)) == NULL) + { + if(np) free(np); return NULL; - hashval = hash(name,HASHSIZE); + } + hashval = hash(name,HASHSIZE); np->next = plugin_table[hashval]; plugin_table[hashval] = np; } else /* already there */ @@ -53,7 +59,7 @@ void * plugin_from_file(char* name) { void *lib_handle; char* error; - char* path = __s("%s%s%s",server_config.plugins_dir,name,server_config.plugins_ext); + char* path = __s("%s%s%s",config()->plugins_dir,name,config()->plugins_ext); void (*fn)(const char*, config_t*); lib_handle = dlopen(path, RTLD_LAZY); if (!lib_handle) @@ -68,7 +74,7 @@ void * plugin_from_file(char* name) if ((error = dlerror()) != NULL) LOG("Problem when setting data path for %s : %s \n", name,error); else - (*fn)(name,&server_config); + (*fn)(name,config()); if(path) free(path); return lib_handle; @@ -79,17 +85,7 @@ void unload_plugin(struct plugin_entry* np) char* error; void (*fn)() = NULL; // find and execute the exit function - fn = (void (*)())dlsym(np->handle, "pexit"); - if ((error = dlerror()) != NULL) - { - LOG("Cant not find exit method from %s : %s \n", np->pname,error); - } - else - { - // execute it - (*fn)(); - } - fn = (void(*)()) dlsym(np->handle, "__release"); + fn = (void(*)()) dlsym(np->handle, "__release__"); if ((error = dlerror()) != NULL) { LOG("Cant not release plugin %s : %s \n", np->pname,error); diff --git a/plugin_manager.h b/plugin_manager.h index 466464f..cd88530 100644 --- a/plugin_manager.h +++ b/plugin_manager.h @@ -3,13 +3,12 @@ #include #include "libs/utils.h" #include "libs/handle.h" - +#include "http_server.h" struct plugin_entry { struct plugin_entry *next; char *pname; void *handle; }; -extern config_t server_config; /* lookup: look for s in hashtab */ struct plugin_entry *plugin_lookup(char *s); /* install: put (name, defn) in hashtab */ From c5b385d8b1cdb8dd4a4c833a83627c5312de4ac7 Mon Sep 17 00:00:00 2001 From: lxsang Date: Fri, 5 Oct 2018 19:05:15 +0200 Subject: [PATCH 10/15] clean up code --- http_server.c | 1 - 1 file changed, 1 deletion(-) diff --git a/http_server.c b/http_server.c index 70cb3d7..1c54ba3 100644 --- a/http_server.c +++ b/http_server.c @@ -688,7 +688,6 @@ dictionary decode_cookie(const char* line) if(token1 && token && strlen(token) > 0) { if(dic == NULL) dic = dict(); - LOG("%s: %s\n", token1, token); dput(dic,token1,strdup(token)); } } From f5070f6c8c029b2129811753c3db5b501bc58328 Mon Sep 17 00:00:00 2001 From: lxsang Date: Fri, 5 Oct 2018 19:07:14 +0200 Subject: [PATCH 11/15] clean up code --- libs/plugin.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/plugin.c b/libs/plugin.c index 31799c0..46a47ed 100644 --- a/libs/plugin.c +++ b/libs/plugin.c @@ -80,7 +80,7 @@ char* config_dir() void __release__() { destroy(); - printf("Releasing plugin\n"); + LOG("Releasing plugin\n"); if(__plugin__.name) free(__plugin__.name); if(__plugin__.dbpath) free(__plugin__.dbpath); if(__plugin__.htdocs) free(__plugin__.htdocs); From 2ca916449ea8c9f88b1d1cdd9ead9ac64774a33f Mon Sep 17 00:00:00 2001 From: lxsang Date: Sat, 6 Oct 2018 00:41:59 +0200 Subject: [PATCH 12/15] fix websocket bug --- http_server.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/http_server.c b/http_server.c index 1c54ba3..5c1c7f4 100644 --- a/http_server.c +++ b/http_server.c @@ -554,11 +554,10 @@ void* decode_request(void* data) if(ws && ws_key != NULL) { ws_confirm_request(rq->client, ws_key); - free(ws_key); // insert wsocket flag to request // plugin should handle this ugraded connection // not the server - dput(request,"__web_socket__",strdup("1")); + dput(rq->request,"__web_socket__",strdup("1")); } // resolve task task->handle = resolve_request; From 1cfb69691f490afcb4d9cce5d2f97710db611df1 Mon Sep 17 00:00:00 2001 From: lxsang Date: Sat, 6 Oct 2018 01:30:38 +0200 Subject: [PATCH 13/15] fix free pointer --- http_server.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http_server.c b/http_server.c index 5c1c7f4..60db928 100644 --- a/http_server.c +++ b/http_server.c @@ -251,7 +251,7 @@ void* resolve_request(void* data) notfound(rq->client); return task; } - if(url) free(url); + //if(url) free(url); this is freed in the dput function url = newurl; dput(rq->request, "RESOURCE_PATH", url); } From 04fec05b701fbfbd8b84a789073029d4ea5b8ef4 Mon Sep 17 00:00:00 2001 From: lxsang Date: Sun, 7 Oct 2018 01:03:05 +0200 Subject: [PATCH 14/15] fix open ssl bug --- http_server.c | 69 ++++++++++++++++++++++++++++++++++++++---------- http_server.h | 1 - httpd.c | 8 +++--- libs/handle.c | 35 +++++++++++++++++++++++- libs/handle.h | 7 ++++- libs/scheduler.c | 1 + 6 files changed, 101 insertions(+), 20 deletions(-) diff --git a/http_server.c b/http_server.c index 60db928..c7ffb3b 100644 --- a/http_server.c +++ b/http_server.c @@ -125,12 +125,7 @@ void load_config(const char* file) } init_file_system(); } -void set_nonblock(int socket) { - int flags; - flags = fcntl(socket,F_GETFL,0); - //assert(flags != -1); - fcntl(socket, F_SETFL, flags | O_NONBLOCK); -} + void* accept_request(void* data) { @@ -144,28 +139,68 @@ void* accept_request(void* data) task = antd_create_task(NULL,(void*)rq,NULL); task->priority++; server_config.connection++; - fd_set read_flags; + fd_set read_flags, write_flags; // first verify if the socket is ready antd_client_t* client = (antd_client_t*) rq->client; FD_ZERO(&read_flags); FD_SET(rq->client->sock, &read_flags); + FD_ZERO(&write_flags); + FD_SET(rq->client->sock, &write_flags); struct timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = 500; // select - int sel = select(client->sock+1, &read_flags, NULL, (fd_set*)0, &timeout); + int sel = select(client->sock+1, &read_flags, &write_flags, (fd_set*)0, &timeout); if(sel == -1) { unknow(rq->client); return task; } - if(sel == 0 || !FD_ISSET(client->sock, &read_flags) ) + if(sel == 0 || (!FD_ISSET(client->sock, &read_flags) && !FD_ISSET(client->sock, &write_flags))) { // retry it later server_config.connection--; task->handle = accept_request; return task; } + // perform the ssl handshake if enabled +#ifdef USE_OPENSSL + int ret,stat; + if(server_config.usessl == 1 && client->status == 0) + { + if (SSL_accept((SSL*)client->ssl) == -1) { + stat = SSL_get_error((SSL*)client->ssl, ret); + switch(stat) + { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + case SSL_ERROR_NONE: + //LOG("RECALL %d\n", stat); + task->handle = accept_request; + task->priority = HIGH_PRIORITY; + server_config.connection--; + return task; + default: + LOG("ERRRRRRRRROR accept %d %d %d\n", stat, ret, ERR_get_error()); + ERR_print_errors_fp(stderr); + return task; + } + } + client->status = 1; + server_config.connection--; + task->handle = accept_request; + return task; + } + else + { + if(!FD_ISSET(client->sock, &read_flags)) + { + task->handle = accept_request; + server_config.connection--; + return task; + } + } +#endif count = read_buf(rq->client, buf, sizeof(buf)); //LOG("count is %d\n", count); line = buf; @@ -831,6 +866,8 @@ void* decode_multi_part_request_data(void* data) { int totalsize=0,len=0; //read until the next boundary + // TODO: this is not efficient for big file + // need a solution while((len = read_buf(rq->client,buf,sizeof(buf))) > 0 && strstr(buf,boundary) <= 0) { fwrite(buf, len, 1, fp); @@ -927,12 +964,16 @@ void decode_url_request(const char* query, dictionary dic) char* post_data_decode(void* client,int len) { char *query = (char*) malloc((len+1)*sizeof(char)); - for (int i = 0; i < len; i++) { - antd_recv(client, (query+i), 1); - } + char* ptr = query; + int readlen = len > BUFFLEN?BUFFLEN:len; + int read = 0; + while(readlen > 0) + { + read += antd_recv(client, query+read, readlen); + LOG("READ %d/%d\n", read, len); + readlen = (len - read) > BUFFLEN?BUFFLEN:(len-read); + } query[len]='\0'; - //query = url_decode(query); - //LOG("JSON Query %s\n", query); return query; } diff --git a/http_server.h b/http_server.h index 6a965c7..ed54a86 100644 --- a/http_server.h +++ b/http_server.h @@ -8,7 +8,6 @@ #include #include #include -#include #include "libs/handle.h" #include "libs/scheduler.h" #include "plugin_manager.h" diff --git a/httpd.c b/httpd.c index 89ba04f..54200ca 100644 --- a/httpd.c +++ b/httpd.c @@ -167,17 +167,19 @@ int main(int argc, char* argv[]) //LOG("Unclosed connection: %d\n", server_config->connection); #ifdef USE_OPENSSL client->ssl = NULL; + client->status = 0; if(config()->usessl == 1) { client->ssl = (void*)SSL_new(ctx); if(!client->ssl) continue; - SSL_set_fd((SSL*)client->ssl, client_sock); + SSL_set_fd((SSL*)client->ssl, client->sock); - if (SSL_accept((SSL*)client->ssl) <= 0) { + /*if (SSL_accept((SSL*)client->ssl) <= 0) { + LOG("EROOR accept\n"); ERR_print_errors_fp(stderr); antd_close(client); continue; - } + }*/ } #endif // create callback for the server diff --git a/libs/handle.c b/libs/handle.c index eea6f16..893d994 100644 --- a/libs/handle.c +++ b/libs/handle.c @@ -100,8 +100,28 @@ int antd_recv(void *src, void* data, int len) #ifdef USE_OPENSSL if(usessl()) { - //LOG("SSL READ\n"); + // TODO: blocking is not good, need a workaround + set_nonblock(source->sock); ret = SSL_read((SSL*) source->ssl, data, len); + set_nonblock(source->sock); + /* + int stat, r, st; + do{ + ret = SSL_read((SSL*) source->ssl, data, len); + stat = SSL_get_error((SSL*)source->ssl, r); + } while(ret == -1 && + ( + stat == SSL_ERROR_WANT_READ || + stat == SSL_ERROR_WANT_WRITE || + stat == SSL_ERROR_NONE || + (stat == SSL_ERROR_SYSCALL && r== 0 && !ERR_get_error()) + )); + if(ret == -1) + { + LOG("Problem reading %d %d %d\n", ret, stat, r); + } + //set_nonblock(source->sock); + */ } else { @@ -116,6 +136,19 @@ int antd_recv(void *src, void* data, int len) }*/ return ret; } +void set_nonblock(int socket) { + int flags; + flags = fcntl(socket,F_GETFL,0); + //assert(flags != -1); + fcntl(socket, F_SETFL, flags | O_NONBLOCK); +} +void set_block() +{ + int flags; + flags = fcntl(socket,F_GETFL,0); + //assert(flags != -1); + fcntl(socket, F_SETFL, flags & (~O_NONBLOCK)); +} int antd_close(void* src) { if(!src) return -1; diff --git a/libs/handle.h b/libs/handle.h index 61da196..9aa1738 100644 --- a/libs/handle.h +++ b/libs/handle.h @@ -11,6 +11,7 @@ #ifdef USE_DB #include "dbhelper.h" #endif +#include #include "dictionary.h" #include "list.h" #include "ini.h" @@ -33,6 +34,9 @@ typedef struct{ int sock; void* ssl; char* ip; +#ifdef USE_OPENSSL + int status; +#endif } antd_client_t; typedef struct { @@ -60,7 +64,8 @@ typedef struct { char* sslkey; #endif }config_t; - +void set_nonblock(int socket); +void set_block(int socket); int response(void*, const char*); void ctype(void*,const char*); void redirect(void*,const char*); diff --git a/libs/scheduler.c b/libs/scheduler.c index 0800231..7d59ee8 100644 --- a/libs/scheduler.c +++ b/libs/scheduler.c @@ -195,6 +195,7 @@ void antd_add_task(antd_scheduler_t* scheduler, antd_task_t* task) { // check if task is exist int prio = task->priority>N_PRIORITY-1?N_PRIORITY-1:task->priority; + //LOG("Prio is %d\n", prio); pthread_mutex_lock(&scheduler->scheduler_lock); enqueue(&scheduler->task_queue[prio], task); pthread_mutex_unlock(&scheduler->scheduler_lock); From 82d83ba665435c587eba050291f8ca68c3cb46ad Mon Sep 17 00:00:00 2001 From: lxsang Date: Sun, 7 Oct 2018 13:57:52 +0200 Subject: [PATCH 15/15] fix openssl bug on nonblocking mode --- http_server.c | 28 +++++++++------- libs/handle.c | 88 +++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 98 insertions(+), 18 deletions(-) diff --git a/http_server.c b/http_server.c index c7ffb3b..e768dbb 100644 --- a/http_server.c +++ b/http_server.c @@ -138,7 +138,6 @@ void* accept_request(void* data) task = antd_create_task(NULL,(void*)rq,NULL); task->priority++; - server_config.connection++; fd_set read_flags, write_flags; // first verify if the socket is ready antd_client_t* client = (antd_client_t*) rq->client; @@ -159,7 +158,6 @@ void* accept_request(void* data) if(sel == 0 || (!FD_ISSET(client->sock, &read_flags) && !FD_ISSET(client->sock, &write_flags))) { // retry it later - server_config.connection--; task->handle = accept_request; return task; } @@ -178,7 +176,6 @@ void* accept_request(void* data) //LOG("RECALL %d\n", stat); task->handle = accept_request; task->priority = HIGH_PRIORITY; - server_config.connection--; return task; default: LOG("ERRRRRRRRROR accept %d %d %d\n", stat, ret, ERR_get_error()); @@ -187,7 +184,6 @@ void* accept_request(void* data) } } client->status = 1; - server_config.connection--; task->handle = accept_request; return task; } @@ -196,11 +192,11 @@ void* accept_request(void* data) if(!FD_ISSET(client->sock, &read_flags)) { task->handle = accept_request; - server_config.connection--; return task; } } #endif + server_config.connection++; count = read_buf(rq->client, buf, sizeof(buf)); //LOG("count is %d\n", count); line = buf; @@ -966,14 +962,24 @@ char* post_data_decode(void* client,int len) char *query = (char*) malloc((len+1)*sizeof(char)); char* ptr = query; int readlen = len > BUFFLEN?BUFFLEN:len; - int read = 0; - while(readlen > 0) + int read = 0, stat = 1; + while(readlen > 0 && stat > 0) { - read += antd_recv(client, query+read, readlen); - LOG("READ %d/%d\n", read, len); - readlen = (len - read) > BUFFLEN?BUFFLEN:(len-read); + stat = antd_recv(client, ptr+read, readlen); + if(stat > 0) + { + read += stat; + readlen = (len - read) > BUFFLEN?BUFFLEN:(len-read); + } + } + + if(read > 0) + query[read]='\0'; + else + { + free(query); + query = NULL; } - query[len]='\0'; return query; } diff --git a/libs/handle.c b/libs/handle.c index 893d994..0a08f6f 100644 --- a/libs/handle.c +++ b/libs/handle.c @@ -95,15 +95,89 @@ int antd_send(void *src, const void* data, int len) int antd_recv(void *src, void* data, int len) { if(!src) return -1; - int ret; + int read; antd_client_t * source = (antd_client_t *) src; #ifdef USE_OPENSSL if(usessl()) { - // TODO: blocking is not good, need a workaround - set_nonblock(source->sock); - ret = SSL_read((SSL*) source->ssl, data, len); - set_nonblock(source->sock); + int received; + char* ptr = (char* )data; + int readlen = len > BUFFLEN?BUFFLEN:len; + read = 0; + fd_set fds; + struct timeval timeout; + while (readlen > 0) + { + received = SSL_read (source->ssl, ptr+read, readlen); + if (received > 0) + { + read += received; + readlen = (len - read) > BUFFLEN?BUFFLEN:(len-read); + } + else + { + //printf(" received equal to or less than 0\n") + int err = SSL_get_error(source->ssl, received); + switch (err) + { + case SSL_ERROR_NONE: + { + // no real error, just try again... + //LOG("SSL_ERROR_NONE \n"); + continue; + } + + case SSL_ERROR_ZERO_RETURN: + { + // peer disconnected... + //printf("SSL_ERROR_ZERO_RETURN \n"); + break; + } + + case SSL_ERROR_WANT_READ: + { + // no data available right now, wait a few seconds in case new data arrives... + //printf("SSL_ERROR_WANT_READ\n"); + + int sock = SSL_get_rfd(source->ssl); + FD_ZERO(&fds); + FD_SET(sock, &fds); + + timeout.tv_sec = 0; + timeout.tv_usec = 500; + err = select(sock+1, &fds, NULL, NULL, &timeout); + if (err == 0 || (err > 0 && FD_ISSET(sock, &fds))) + continue; // more data to read... + break; + } + + case SSL_ERROR_WANT_WRITE: + { + // socket not writable right now, wait a few seconds and try again... + //printf("SSL_ERROR_WANT_WRITE \n"); + int sock = SSL_get_wfd(source->ssl); + FD_ZERO(&fds); + FD_SET(sock, &fds); + + timeout.tv_sec = 0; + timeout.tv_usec = 500; + + err = select(sock+1, NULL, &fds, NULL, &timeout); + if (err == 0 || (err > 0 && FD_ISSET(sock, &fds))) + continue; // can write more data now... + break; + } + + default: + { + // other error + break; + } + } + + break; + } + } /* int stat, r, st; do{ @@ -126,7 +200,7 @@ int antd_recv(void *src, void* data, int len) else { #endif - ret = recv(((int) source->sock), data, len, 0); + read = recv(((int) source->sock), data, len, 0); #ifdef USE_OPENSSL } #endif @@ -134,7 +208,7 @@ int antd_recv(void *src, void* data, int len) { antd_close(src); }*/ - return ret; + return read; } void set_nonblock(int socket) { int flags;