diff --git a/Makefile b/Makefile index 3a60486..068ff55 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,8 @@ LIBOBJS = libs/ini.o \ libs/utils.o \ libs/ws.o \ libs/sha1.o \ - libs/list.o + libs/list.o \ + libs/scheduler.o PLUGINSDEP = libs/plugin.o @@ -30,6 +31,9 @@ httpd: lib $(SERVER_O) $(CC) $(CFLAGS) $(SERVER_O) -o $(BUILDIRD)/httpd httpd.c $(SERVERLIB) cp antd $(BUILDIRD) +relay: lib $(SERVER_O) + $(CC) $(CFLAGS) $(SERVER_O) -o $(BUILDIRD)/relay relay.c $(SERVERLIB) + cp relayd $(BUILDIRD) lib: $(LIBOBJS) $(CC) $(CFLAGS) $(DB_LIB) $(SSL_LIB) -shared -o $(LIB_NAME).$(EXT) $(LIBOBJS) cp $(LIB_NAME).$(EXT) $(LIB_PATH$)/ diff --git a/libs/scheduler.c b/libs/scheduler.c new file mode 100644 index 0000000..0b04f1b --- /dev/null +++ b/libs/scheduler.c @@ -0,0 +1,301 @@ +#include "scheduler.h" + +/* +private data & methods +*/ +static antd_scheduler_t scheduler; + +static void enqueue(antd_task_t* task) +{ + // check if task is exist + int prio = task->priority>N_PRIORITY-1?N_PRIORITY-1:task->priority; + antd_task_queue_t q = scheduler.task_queue[prio]; + antd_task_item_t it = q; + while(it && it->task->id != task->id && it->next != NULL) + it = it->next; + if(it && it->task->id == task->id) + { + LOG("Task %d exists, ignore it\n", task->id); + return; + } + antd_task_item_t taski = (antd_task_item_t)malloc(sizeof *taski); + taski->task = task; + taski->next = NULL; + if(!it) // first task + { + scheduler.task_queue[prio] = taski; + } + else + { + it->next = taski; + } +} + +static int working() +{ + return scheduler.status; +} + +static void stop() +{ + pthread_mutex_lock(&scheduler.server_lock); + scheduler.status = 0; + pthread_mutex_unlock(&scheduler.server_lock); + for (int i = 0; i < scheduler.n_workers; i++) + pthread_join(scheduler.workers[i].pid, NULL); + if(scheduler.workers) free(scheduler.workers); +} + +static antd_task_item_t dequeue(int priority) +{ + int prio = priority>N_PRIORITY-1?N_PRIORITY-1:priority; + antd_task_item_t it = scheduler.task_queue[prio]; + if(it) + { + scheduler.task_queue[prio] = it->next; + } + return it; +} + +static antd_task_item_t next_task() +{ + antd_task_item_t it = NULL; + pthread_mutex_lock(&scheduler.server_lock); + for(int i = 0; i< N_PRIORITY; i++) + { + + it = dequeue(i); + if(it) break; + } + pthread_mutex_unlock(&scheduler.server_lock); + return it; +} + +static int available_workers() +{ + int n = 0; + pthread_mutex_lock(&scheduler.server_lock); + for(int i=0; i < scheduler.n_workers; i++) + if(scheduler.workers[i].status == 0) n++; + pthread_mutex_unlock(&scheduler.server_lock); + return n; +} + +static void work(void* data) +{ + antd_task_item_t it; + antd_worker_t* worker = (antd_worker_t*)data; + while(working()) + { + pthread_mutex_lock(&scheduler.server_lock); + worker->status = 0; + pthread_mutex_unlock(&scheduler.server_lock); + // fetch the next in queue + it = next_task(); + if(!it) continue; + //LOG("worker processing \n"); + pthread_mutex_lock(&scheduler.server_lock); + worker->status = 1; + pthread_mutex_unlock(&scheduler.server_lock); + // execute the task + antd_execute_task(it); + } +} + +static antd_callback_t* callback_of( void* (*callback)(void*) ) +{ + antd_callback_t* cb = NULL; + if(callback) + { + cb = (antd_callback_t*)malloc(sizeof *cb); + cb->handle = callback; + cb->next = NULL; + } + return cb; +} + +static void free_callback(antd_callback_t* cb) +{ + antd_callback_t* it = cb; + antd_callback_t* curr; + while(it) + { + curr = it; + it = it->next; + free(curr); + } +} + +static void enqueue_callback(antd_callback_t* cb, antd_callback_t* el) +{ + antd_callback_t* it = cb; + while(it && it->next != NULL) + it = it->next; + if(!it) return; // this should not happend + it->next = el; +} + +static void execute_callback(antd_task_t* task) +{ + antd_callback_t* cb = task->callback; + if(cb) + { + // call the first come call back + task->handle = cb->handle; + task->callback = task->callback->next; + free(cb); + antd_add_task(task); + } + else + { + free(task); + } +} + + +/* + Main API methods + init the main scheduler +*/ +void antd_scheduler_init(int n) +{ + time_t t; + srand((unsigned) time(&t)); + scheduler.n_workers = n; + scheduler.status = 1; + scheduler.workers = (antd_worker_t*)malloc(n*(sizeof(antd_worker_t))); + if(!scheduler.workers) + { + LOG("Cannot allocate memory for worker\n"); + exit(-1); + } + for(int i = 0; i < N_PRIORITY; i++) scheduler.task_queue[i] = NULL; + // create scheduler.workers + for(int i = 0; i < scheduler.n_workers;i++) + { + scheduler.workers[i].status = 0; + if (pthread_create(&scheduler.workers[i].pid , NULL,(void *(*)(void *))work, (void*)&scheduler.workers[i]) != 0) + { + scheduler.workers[i].status = -1; + perror("pthread_create: cannot create worker\n"); + } + } + LOG("Antd scheduler initialized with %d worker\n", scheduler.n_workers); +} +void antd_task_lock() +{ + pthread_mutex_lock(&scheduler.task_lock); +} +void antd_task_unlock() +{ + pthread_mutex_unlock(&scheduler.task_lock); +} +/* + destroy all pending task +*/ +void antd_scheduler_destroy() +{ + // free all the chains + antd_task_item_t it, curr; + for(int i=0; i < N_PRIORITY; i++) + { + it = scheduler.task_queue[i]; + while(it) + { + // first free the task + if(it->task && it->task->callback) free_callback(it->task->callback); + if(it->task) free(it->task); + // then free the placeholder + curr = it; + it = it->next; + free(curr); + } + } + stop(); +} + +/* + create a task +*/ +antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callback)(void*)) +{ + antd_task_t* task = (antd_task_t*)malloc(sizeof *task); + task->stamp = (unsigned long)time(NULL); + task->id = rand(); + task->data = data; + task->handle = handle; + task->callback = callback_of(callback); + task->priority = NORMAL_PRIORITY; + return task; +} + +/* + scheduling a task +*/ +void antd_add_task(antd_task_t* task) +{ + pthread_mutex_lock(&scheduler.server_lock); + enqueue(task); + pthread_mutex_unlock(&scheduler.server_lock); +} + + +void antd_execute_task(antd_task_item_t taski) +{ + // execute the task + void *ret = (*(taski->task->handle))(taski->task->data); + // check the return data if it is a new task + if(!ret) + { + // call the first callback + execute_callback(taski->task); + free(taski); + } + else + { + antd_task_t* rtask = (antd_task_t*) ret; + if(taski->task->callback) + { + if(rtask->callback) + { + enqueue_callback(rtask->callback, taski->task->callback); + } + else + { + rtask->callback = taski->task->callback; + } + } + if(!rtask->handle) + { + // call the first callback + execute_callback(rtask); + free(taski->task); + free(taski); + } + else + { + antd_add_task(rtask); + free(taski->task); + free(taski); + } + } +} +int antd_scheduler_busy() +{ + int ret = 0; + if(available_workers() != scheduler.n_workers) return 1; + + pthread_mutex_lock(&scheduler.server_lock); + for(int i = 0; i < N_PRIORITY; i++) + if(scheduler.task_queue[i] != NULL) + { + ret = 1; + break; + } + pthread_mutex_unlock(&scheduler.server_lock); + return ret; +} +int antd_scheduler_status() +{ + return scheduler.status; +} \ No newline at end of file diff --git a/libs/scheduler.h b/libs/scheduler.h new file mode 100644 index 0000000..9d1ff65 --- /dev/null +++ b/libs/scheduler.h @@ -0,0 +1,97 @@ +#ifndef ANT_SCHEDULER +#define ANT_SCHEDULER + +#include "utils.h" +#include +// thread pool of workers +#define N_PRIORITY 10 +#define NORMAL_PRIORITY ((int)((N_PRIORITY - 1) / 2)) +#define LOW_PRIORITY (N_PRIORITY - 1) +#define HIGH_PRIORITY 0 + +// callback definition +typedef struct __callback_t{ + void* (*handle)(void*); + struct __callback_t * next; +} antd_callback_t; +// task definition +typedef struct { + /* + creation time of a task + */ + unsigned long stamp; + /* + unique id + */ + int id; + /* + priority from 0 to 9 + higher value is lower priority + */ + uint8_t priority; + /* + the callback + */ + void* (*handle)(void*); + antd_callback_t* callback; + /* + user data if any + */ + void * data; + +} antd_task_t; + +typedef struct { + pthread_t pid; + uint8_t status; // -1 quit, 0 available, 1 busy +} antd_worker_t; + + +typedef struct __task_item_t{ + antd_task_t* task; + struct __task_item_t* next; +}* antd_task_item_t; + +typedef antd_task_item_t antd_task_queue_t; + +typedef struct { + pthread_mutex_t server_lock; + pthread_mutex_t task_lock; + antd_task_queue_t task_queue[N_PRIORITY]; + uint8_t status; // 0 stop, 1 working + antd_worker_t* workers; + int n_workers; +} antd_scheduler_t; + +/* + init the main scheduler +*/ +void antd_scheduler_init(); +/* + destroy all pending task +*/ +void antd_scheduler_destroy(); + +/* + create a task +*/ +antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callback)(void*)); + +/* + scheduling a task +*/ +void antd_add_task(antd_task_t*); + +void antd_task_lock(); +void antd_task_unlock(); +/* + Execute a task +*/ +int antd_scheduler_status(); +/* + execute and free a task a task +*/ +void antd_execute_task(antd_task_item_t); + +int antd_scheduler_busy(); +#endif \ No newline at end of file diff --git a/libs/utils.h b/libs/utils.h index 139df74..c9b8a3d 100644 --- a/libs/utils.h +++ b/libs/utils.h @@ -58,6 +58,9 @@ THE SOFTWARE. #else #define LOG(a,...) do{}while(0) #endif +// add this to the utils +#define UNUSED(x) (void)(x) + #define BUFFLEN 1024 #define HASHSIZE 1024 #define DHASHSIZE 50 diff --git a/relay.c b/relay.c new file mode 100644 index 0000000..455df91 --- /dev/null +++ b/relay.c @@ -0,0 +1,123 @@ +#include "http_server.h" +#include "libs/scheduler.h" +/* +this node is a relay from the http +to https +*/ +#define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0 +int server_sock = -1; +void stop_serve(int dummy) { + UNUSED(dummy); + antd_scheduler_destroy(); + close(server_sock); +} +/* +HTTP/1.1 301 Moved Permanently +Location: http://www.example.org/ +Content-Type: text/html +Content-Length: 174 +*/ +void* antd_redirect(void* user_data) +{ + void** data = (void**)user_data; + void* client = data[0]; + char* host = (char*)data[1]; + __t(client,"%s", "HTTP/1.1 301 Moved Permanently"); + __t(client, "Location: https://%s", host); + __t(client, "%s", "Content-Type: text/html"); + __t(client, ""); + __t(client, "This page has moved to https://%s", host); + free(host); + free(user_data); + return antd_create_task(NULL,client, NULL); +} + +void* antd_free_client(void* client) +{ + antd_client_t * source = (antd_client_t *) client; + if(source->ip) free(source->ip); + close(source->sock); + free(client); + return NULL; +} + +void* antd_get_host(void * client) +{ + char buff[1024]; + char* line, *token; + char* host = NULL; + while((read_buf(client,buff,sizeof(buff))) && strcmp("\r\n",buff)) + { + line = buff; + trim(line, '\n'); + trim(line, '\r'); + token = strsep(&line, ":"); + trim(token,' '); + trim(line,' '); + if(token && strcasecmp(token,"Host")==0) + if(line) + { + host = strdup(line); + break; + } + } + if(!host) host = strdup("lxsang.me"); + void** data = (void**)malloc(2*(sizeof *data)); + data[0] = client; + data[1] = (void*)host; + LOG("Host is %s\n", host); + return antd_create_task(antd_redirect,data, NULL); +} + +int main(int argc, char* argv[]) +{ + UNUSED(argc); + UNUSED(argv); +// load the config first + unsigned port = 80; + int client_sock = -1; + struct sockaddr_in client_name; + socklen_t client_name_len = sizeof(client_name); + // ignore the broken PIPE error when writing + //or reading to/from a closed socked connection + signal(SIGPIPE, SIG_IGN); + signal(SIGABRT, SIG_IGN); + signal(SIGINT, stop_serve); + server_sock = startup(&port); + // 1 worker is + antd_scheduler_init(1); + LOG("httpd running on port %d\n", port); + + while (antd_scheduler_status()) + { + client_sock = accept(server_sock,(struct sockaddr *)&client_name,&client_name_len); + if (client_sock == -1) + { + perror("Cannot accept client request\n"); + continue; + } + antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); + // set timeout to socket + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 500; + + if (setsockopt (client_sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,sizeof(timeout)) < 0) + perror("setsockopt failed\n"); + + if (setsockopt (client_sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout,sizeof(timeout)) < 0) + perror("setsockopt failed\n"); + + /* + get the remote IP + */ + client->ip = NULL; + if (client_name.sin_family == AF_INET) + client->ip = strdup(inet_ntoa(client_name.sin_addr)); + client->sock = client_sock; + //accept_request(&client); + antd_add_task(antd_create_task(antd_get_host,(void*)client, antd_free_client )); + } + + return(0); +} \ No newline at end of file diff --git a/relayd b/relayd new file mode 100644 index 0000000..dd2f1cc --- /dev/null +++ b/relayd @@ -0,0 +1,8 @@ +#!/bin/sh +UNAME=`uname -s` + +if [ "$UNAME" = "Darwin" ]; then + DYLD_LIBRARY_PATH=$(dirname "$0")/plugins/ $(dirname "$0")/relay +else + LD_LIBRARY_PATH=$(dirname "$0")/plugins/ $(dirname "$0")/relay +fi