diff --git a/http_server.c b/http_server.c index 175684c..e6f2ab4 100644 --- a/http_server.c +++ b/http_server.c @@ -300,6 +300,7 @@ void *accept_request(void *data) antd_request_t *rq = (antd_request_t *)data; task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); fd_set read_flags, write_flags; // first verify if the socket is ready antd_client_t *client = (antd_client_t *)rq->client; @@ -406,6 +407,7 @@ void *resolve_request(void *data) char path[2 * BUFFLEN]; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); char *url = (char *)dvalue(rq->request, "RESOURCE_PATH"); char *newurl = NULL; char *rqp = NULL; @@ -498,8 +500,6 @@ void *resolve_request(void *data) } else { - task->type = HEAVY; - // discard all request data dictionary_t headers = (dictionary_t)dvalue(rq->request, "REQUEST_HEADER"); if (headers) @@ -613,6 +613,7 @@ void *serve_file(void *data) { antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); char *path = (char *)dvalue(rq->request, "ABS_RESOURCE_PATH"); char *mime_type = (char *)dvalue(rq->request, "RESOURCE_MIME"); rq->client->state = ANTD_CLIENT_SERVE_FILE; @@ -772,6 +773,7 @@ void *decode_request_header(void *data) dictionary_t request = dvalue(rq->request, "REQUEST_DATA"); char *port_s = (char *)dvalue(xheader, "SERVER_PORT"); port_config_t *pcnf = (port_config_t *)dvalue(server_config.ports, port_s); + antd_task_t * task; // first real all header // this for check if web socket is enabled @@ -805,7 +807,9 @@ void *decode_request_header(void *data) { antd_error(rq->client, 413, "Payload Too Large"); ERROR("Header size too large (%d): %d vs %d", rq->client->sock, header_size, HEADER_MAX_SIZE); - return antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + return task; } } // check for content length size @@ -819,7 +823,9 @@ void *decode_request_header(void *data) // dirty fix, wait for message to be sent // 100 ms sleep usleep(100000); - return antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + return task; } } @@ -863,7 +869,8 @@ void *decode_request_header(void *data) if (host) free(host); // header ok, now checkmethod - antd_task_t *task = antd_create_task(decode_request, (void *)rq, NULL, rq->client->last_io); + task = antd_create_task(decode_request, (void *)rq, NULL, rq->client->last_io); + antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); return task; } @@ -882,6 +889,7 @@ void *decode_request(void *data) ws = 1; method = (char *)dvalue(rq->request, "METHOD"); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); if (strcmp(method, "GET") == 0 || strcmp(method, "HEAD") == 0 || strcmp(method, "OPTIONS") == 0) { //if(ctype) free(ctype); @@ -900,7 +908,6 @@ void *decode_request(void *data) else if (strcmp(method, "POST") == 0) { task->handle = resolve_request; - //task->type = HEAVY; return task; } else @@ -925,7 +932,7 @@ void *decode_post_request(void *data) clen = atoi(tmp); char *method = (char *)dvalue(rq->request, "METHOD"); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - task->type = HEAVY; + antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); if (!method || strcmp(method, "POST") != 0) return task; if (ctype == NULL || clen == -1) @@ -1041,6 +1048,7 @@ void *decode_multi_part_request(void *data, const char *ctype) int len; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); //dictionary dic = NULL; boundary = strsep(&str_copy, "="); //discard first part boundary = str_copy; @@ -1057,7 +1065,6 @@ void *decode_multi_part_request(void *data, const char *ctype) task->handle = decode_multi_part_request_data; } } - task->type = HEAVY; return task; } void *decode_multi_part_request_data(void *data) @@ -1075,6 +1082,7 @@ void *decode_multi_part_request_data(void *data) char *token, *keytoken, *valtoken; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); char *boundary = (char *)dvalue(rq->request, "MULTI_PART_BOUNDARY"); dictionary_t dic = (dictionary_t)dvalue(rq->request, "REQUEST_DATA"); // search for content disposition: @@ -1202,7 +1210,6 @@ void *decode_multi_part_request_data(void *data) if (line && strstr(line, boundary)) { // continue upload - task->type = HEAVY; task->handle = decode_multi_part_request_data; } free(boundend); @@ -1296,6 +1303,7 @@ void *execute_plugin(void *data, const char *pname) char *error; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); //LOG("Plugin name '%s'", pname); rq->client->state = ANTD_CLIENT_PLUGIN_EXEC; //load the plugin @@ -1328,12 +1336,12 @@ void *execute_plugin(void *data, const char *pname) if (meta && meta->raw_body == 1) { task->handle = fn; - task->type = HEAVY; } else { free(task); task = antd_create_task(decode_post_request, (void *)rq, fn, rq->client->last_io); + antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); } return task; } diff --git a/httpd.c b/httpd.c index 3d1ff0a..9b4a1d5 100644 --- a/httpd.c +++ b/httpd.c @@ -241,7 +241,7 @@ static void antd_monitor(port_config_t *pcnf) antd_scheduler_unlock(scheduler); // create callback for the server task = antd_create_task(accept_request, (void *)request, finish_request, client->last_io); - //task->type = LIGHT; + antd_task_bind_event(task,client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); antd_scheduler_add_task(scheduler, task); } } diff --git a/lib/scheduler.c b/lib/scheduler.c index 7d0f399..21f6671 100644 --- a/lib/scheduler.c +++ b/lib/scheduler.c @@ -5,12 +5,14 @@ #include #include #include +#include #include "scheduler.h" #include "utils.h" #include "bst.h" #define MAX_VALIDITY_INTERVAL 20 #define MAX_FIFO_NAME_SZ 255 +#define POLL_EVENT_TO 100 // ms // callback definition struct _antd_callback_t @@ -19,23 +21,26 @@ struct _antd_callback_t struct _antd_callback_t *next; }; -typedef struct { - int type; +typedef struct +{ + int flags; int fd; -} antd_scheduler_evt_item_t; + int timeout; // seconds + antd_task_t *task; +} antd_task_evt_item_t; struct _antd_queue_item_t { union { - antd_scheduler_evt_item_t* evt; + antd_task_evt_item_t *evt; antd_task_t *task; - void * raw_ptr; + void *raw_ptr; }; struct _antd_queue_item_t *next; -}; +}; -typedef struct _antd_queue_item_t* antd_queue_item_t; +typedef struct _antd_queue_item_t *antd_queue_item_t; typedef antd_queue_item_t antd_queue_t; @@ -70,7 +75,8 @@ struct _antd_scheduler_t static antd_callback_t *callback_of(void *(*callback)(void *)); static void antd_execute_task(antd_scheduler_t *, antd_task_t *); -static int antd_task_schedule(antd_scheduler_t *); +static void antd_task_schedule(antd_scheduler_t *scheduler, antd_task_t *task); +static void destroy_task(void *data); static void set_nonblock(int fd) { @@ -179,11 +185,12 @@ static void execute_callback(antd_scheduler_t *scheduler, antd_task_t *task) task->handle = cb->handle; task->callback = task->callback->next; free(cb); + //antd_task_bind_event(task, 0, 0, TASK_EVT_ALWAY_ON); antd_scheduler_add_task(scheduler, task); } else { - free(task); + destroy_task(task); } } @@ -193,24 +200,18 @@ static void destroy_queue(antd_queue_t q, int is_task) it = q; while (it) { - if(is_task) + if (is_task) { // first free the task - if (it->task && it->task->callback) - { - free_callback(it->task->callback); - it->task->callback = NULL; - } - if (it->task) - free(it->task); + destroy_task(it->task); } else { - if(it->raw_ptr) + if (it->raw_ptr) { free(it->raw_ptr); } - } + } // then free the placeholder curr = it; it = it->next; @@ -264,9 +265,6 @@ static void print_static_info(bst_node_t *node, void **args, int argc) snprintf(buffer, MAX_FIFO_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL)); ret = write(*fdp, buffer, strlen(buffer)); - snprintf(buffer, MAX_FIFO_NAME_SZ, "Task type: %d\n", task->type); - ret = write(*fdp, buffer, strlen(buffer)); - if (task->handle) { snprintf(buffer, MAX_FIFO_NAME_SZ, "Has handle: yes\n"); @@ -460,8 +458,18 @@ antd_scheduler_t *antd_scheduler_init(int n, const char *stat_name) static void destroy_task(void *data) { antd_task_t *task = (antd_task_t *)data; - if (task && task->callback) + if (!task) + return; + if (task->callback) + { free_callback(task->callback); + task->callback = NULL; + } + if (task->events) + { + destroy_queue(task->events, 0); + task->events = NULL; + } if (task) free(task); } @@ -479,14 +487,14 @@ void antd_scheduler_destroy(antd_scheduler_t *scheduler) LOG("Destroy remaining queue"); bst_free_cb(scheduler->task_queue, destroy_task); scheduler->task_queue = NULL; - destroy_queue(scheduler->workers_queue,1); + destroy_queue(scheduler->workers_queue, 1); free(scheduler); } /* create a task */ -antd_task_t *antd_mktask(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t atime, antd_task_type_t type) +antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t atime) { antd_task_t *task = (antd_task_t *)malloc(sizeof *task); task->stamp = (unsigned long)time(NULL); @@ -494,8 +502,8 @@ antd_task_t *antd_mktask(void *(*handle)(void *), void *data, void *(*callback)( task->handle = handle; task->id = antd_task_data_id(data); task->callback = callback_of(callback); - task->type = type; task->access_time = atime; + task->events = NULL; return task; } @@ -504,7 +512,7 @@ antd_task_t *antd_mktask(void *(*handle)(void *), void *data, void *(*callback)( */ void antd_scheduler_add_task(antd_scheduler_t *scheduler, antd_task_t *task) { - if(task->id == 0) + if (task->id == 0) task->id = antd_scheduler_next_id(scheduler, task->id); pthread_mutex_lock(&scheduler->scheduler_lock); scheduler->task_queue = bst_insert(scheduler->task_queue, task->id, (void *)task); @@ -541,17 +549,18 @@ static void antd_execute_task(antd_scheduler_t *scheduler, antd_task_t *task) { rtask->callback = task->callback; } + task->callback = NULL; } if (!rtask->handle) { // call the first callback execute_callback(scheduler, rtask); - free(task); + destroy_task(task); } else { antd_scheduler_add_task(scheduler, rtask); - free(task); + destroy_task(task); } } } @@ -571,23 +580,12 @@ void antd_scheduler_unlock(antd_scheduler_t *sched) pthread_mutex_unlock(&sched->scheduler_lock); } -static int antd_task_schedule(antd_scheduler_t *scheduler) +static void antd_task_schedule(antd_scheduler_t *scheduler, antd_task_t *task) { - // fetch next task from the task_queue - antd_task_t *task = NULL; - bst_node_t *node; - pthread_mutex_lock(&scheduler->scheduler_lock); - node = bst_find_min(scheduler->task_queue); - if (node) - { - task = (antd_task_t *)node->data; - scheduler->task_queue = bst_delete(scheduler->task_queue, node->key); - } - pthread_mutex_unlock(&scheduler->scheduler_lock); // no task if (!task) { - return 0; + return; } pthread_mutex_lock(&scheduler->pending_lock); scheduler->pending_task--; @@ -600,18 +598,11 @@ static int antd_task_schedule(antd_scheduler_t *scheduler) // data task is not valid LOG("Task is no longer valid and will be killed"); antd_scheduler_destroy_data(task->data); - if (task->callback) - { - free_callback(task->callback); - task->callback = NULL; - } - - free(task); - return 0; + destroy_task(task); + return; } - // check the type of task - if (task->type == LIGHT || scheduler->n_workers <= 0) + if (scheduler->n_workers <= 0) { // do it by myself antd_execute_task(scheduler, task); @@ -626,16 +617,170 @@ static int antd_task_schedule(antd_scheduler_t *scheduler) // wake up idle worker sem_post(scheduler->worker_sem); } - return 1; } -void* antd_scheduler_wait(void* ptr) + +static void task_polls_collect(bst_node_t* node, void** argv, int argc) { - int stat; - antd_scheduler_t *scheduler = (antd_scheduler_t *) ptr; + UNUSED(argc); + antd_task_evt_item_t* it = (antd_task_evt_item_t*)node->data; + struct pollfd* pfds = (struct pollfd*)argv[0]; + if(it) + { + pfds[node->key].fd = it->fd; + if(it->flags & TASK_EVT_ON_READABLE) + { + pfds[node->key].events |= POLLIN; + } + if(it->flags & TASK_EVT_ON_WRITABLE) + { + pfds[node->key].events |= POLLOUT; + } + } +} + +static void task_event_collect(bst_node_t* node, void** argv, int argc) +{ + UNUSED(argc); + antd_task_t* task = (antd_task_t*) node->data; + antd_queue_t* exec_list = (antd_queue_t*) argv[0]; + bst_node_t** poll_list = (bst_node_t**) argv[1]; + int* pollsize = (int*) argv[2]; + + if(!task->events) + { + enqueue(exec_list, task); + return; + } + antd_queue_item_t it = task->events; + while(it) + { + if(it->evt->flags & TASK_EVT_ALWAY_ON) + { + enqueue(exec_list, task); + } + else if(it->evt->flags & TASK_EVT_ON_TIMEOUT) + { + // check if timeout + if(difftime(time(NULL),task->stamp) > it->evt->timeout ) + { + enqueue(exec_list, task); + } + } + else + { + *poll_list = bst_insert(*poll_list, *pollsize, it->evt); + *pollsize = (*pollsize)+1; + } + it = it->next; + } +} + +void antd_task_bind_event(antd_task_t *task, int fd, int timeout, int flags) +{ + antd_task_evt_item_t *eit = (antd_task_evt_item_t *)malloc(sizeof(antd_task_evt_item_t)); + eit->fd = fd; + eit->timeout = timeout; + eit->flags = flags; + eit->task = task; + enqueue(&task->events, eit); +} + +void *antd_scheduler_wait(void *ptr) +{ + int pollsize, ready; + void *argv[3]; + antd_queue_t exec_list = NULL; + bst_node_t* poll_list = NULL; + antd_queue_item_t it = NULL; + antd_queue_item_t curr = NULL; + antd_task_evt_item_t *eit = NULL; + bst_node_t* node = NULL; + struct pollfd *pfds = NULL; + antd_scheduler_t *scheduler = (antd_scheduler_t *)ptr; + while (scheduler->status) { - stat = antd_task_schedule(scheduler); - if (!stat) + pollsize = 0; + argv[0] = &exec_list; + argv[1] = &poll_list; + argv[2] = &pollsize; + pthread_mutex_lock(&scheduler->scheduler_lock); + bst_for_each(scheduler->task_queue, task_event_collect, argv, 3); + pthread_mutex_unlock(&scheduler->scheduler_lock); + // schedule exec list first + it = exec_list; + while(it) + { + if(it->task) + { + + pthread_mutex_lock(&scheduler->scheduler_lock); + scheduler->task_queue = bst_delete(scheduler->task_queue, it->task->id); + pthread_mutex_unlock(&scheduler->scheduler_lock); + antd_task_schedule(scheduler, it->task); + } + curr = it; + it = it->next; + free(curr); + } + // Detect event on pollist + if(pollsize > 0) + { + pfds = (struct pollfd*)malloc(pollsize*sizeof(struct pollfd)); + memset(pfds, 0, pollsize*sizeof(struct pollfd)); + if(pfds) + { + argv[0] = pfds; + bst_for_each(poll_list,task_polls_collect, argv, 1); + // now poll event + ready = poll(pfds, pollsize, POLL_EVENT_TO); + if(ready == -1) + { + // this should not happends + ERROR("Unable to poll: %s", strerror(errno)); + // TODO: exit ? + } + else if(ready > 0) + { + for (int i = 0; i < pollsize; i++) + { + // find the event + node = bst_find(poll_list,i); + if(node) + eit = (antd_task_evt_item_t *)node->data; + if(eit) + { + if( ((eit->flags & TASK_EVT_ON_READABLE) && (pfds[i].revents & POLLIN)) + || ( (eit->flags & TASK_EVT_ON_WRITABLE) && (pfds[i].revents & POLLOUT)) + ) { + // event triggered schedule the task + pthread_mutex_lock(&scheduler->scheduler_lock); + scheduler->task_queue = bst_delete(scheduler->task_queue, eit->task->id); + pthread_mutex_unlock(&scheduler->scheduler_lock); + antd_task_schedule(scheduler, eit->task); + } + else if( (pfds[i].revents & POLLERR) || (pfds[i].revents & POLLHUP) ) { + // task is no longer available + ERROR("Poll: Task %d is no longer valid. Remove it", eit->task->id); + // remove task from task queue + pthread_mutex_lock(&scheduler->scheduler_lock); + scheduler->task_queue = bst_delete(scheduler->task_queue, eit->task->id); + pthread_mutex_unlock(&scheduler->scheduler_lock); + antd_scheduler_destroy_data(eit->task->data); + destroy_task(eit->task); + } + } + } + + } + free(pfds); + } + } + exec_list = NULL; + bst_free(poll_list); + poll_list = NULL; + + if (!scheduler->task_queue) { // no task found, go to idle state sem_wait(scheduler->scheduler_sem); @@ -672,7 +817,9 @@ void antd_scheduler_ext_statistic(int fd, void *data) } int antd_scheduler_validate_data(antd_task_t *task) { - return !(difftime(time(NULL), task->access_time) > MAX_VALIDITY_INTERVAL); + UNUSED(task); + return 1; + //!(difftime(time(NULL), task->access_time) > MAX_VALIDITY_INTERVAL); } void antd_scheduler_destroy_data(void *data) { diff --git a/lib/scheduler.h b/lib/scheduler.h index 7b77ad7..80bd063 100644 --- a/lib/scheduler.h +++ b/lib/scheduler.h @@ -5,17 +5,16 @@ #include #include -#define antd_create_task(h, d, c, t) (antd_mktask(h, d, c, t, HEAVY)) -typedef enum -{ - LIGHT, - HEAVY -} antd_task_type_t; +// define the event +#define TASK_EVT_ALWAY_ON 0x01 +#define TASK_EVT_ON_READABLE 0x02 +#define TASK_EVT_ON_WRITABLE 0x04 +#define TASK_EVT_ON_TIMEOUT 0x08 typedef struct _antd_scheduler_t antd_scheduler_t; typedef struct _antd_callback_t antd_callback_t; -typedef struct _antd_queue_item_t antd_scheduler_evt_t; +typedef struct _antd_queue_item_t* antd_task_evt_list_t; typedef struct { /** @@ -42,17 +41,11 @@ typedef struct * one or more event, otherwise it will be * rejected by the scheduler * */ - antd_scheduler_evt_t* events; + antd_task_evt_list_t events; /** * user data if any */ void *data; - /** - * type of a task - * light tasks are executed directly - * heavy tasks are delegated to workers - */ - antd_task_type_t type; } antd_task_t; /* * nit the main scheduler @@ -71,8 +64,17 @@ void antd_scheduler_destroy(antd_scheduler_t *); * - callback * - last data access time */ -antd_task_t *antd_mktask(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t, antd_task_type_t type); +antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t); +/** + * ALWAY_ON flag doest not need a file descriptor, it will be executed immediately by the scheduler + * ANY file descriptor should work with READABLE and WRITABLE flags, including timerfd for precision timeout + * Timeout flag (in seconds precision): val is the number of seconds + * + * File descriptor close operation is not handled by the scheduler + * + * */ +void antd_task_bind_event(antd_task_t* task, int fd, int timeout, int flags); /** * add a task */ @@ -90,7 +92,7 @@ int antd_scheduler_ok(antd_scheduler_t *scheduler); * * wait for event */ -void* antd_scheduler_wait(void *); +void *antd_scheduler_wait(void *); /** * lock the scheduler @@ -99,7 +101,7 @@ void antd_scheduler_lock(antd_scheduler_t *); /** * Get next valid task id * */ -int antd_scheduler_next_id(antd_scheduler_t* sched, int input); +int antd_scheduler_next_id(antd_scheduler_t *sched, int input); /** * unlock the scheduler * */