From cbbc48d21669e4de213ffbf22240f7b3d16c531c Mon Sep 17 00:00:00 2001 From: lxsang Date: Sun, 3 Jan 2021 11:24:55 +0100 Subject: [PATCH] improvement scheduler --- http_server.c | 11 +- httpd.c | 36 +++-- lib/bst.c | 6 +- lib/bst.h | 4 +- lib/scheduler.c | 404 ++++++++++++++++++++++++++++++++---------------- lib/scheduler.h | 186 ++++++++++------------ lib/utils.c | 49 ++++++ lib/utils.h | 2 + 8 files changed, 436 insertions(+), 262 deletions(-) diff --git a/http_server.c b/http_server.c index 6a77bdc..175684c 100644 --- a/http_server.c +++ b/http_server.c @@ -679,7 +679,7 @@ int startup(unsigned *port) ERROR("Port %d - socket: %s", *port, strerror(errno)); return -1; } - + if (setsockopt(httpd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) == -1) { ERROR("Unable to set reuse address on port %d - setsockopt: %s", *port, strerror(errno)); @@ -848,7 +848,7 @@ void *decode_request_header(void *data) #endif //if(line) free(line); memset(buf, 0, sizeof(buf)); - strcat(buf, url); + strncat(buf, url, sizeof(buf) - 1); LOG("Original query (%d): %s", rq->client->sock, url); query = apply_rules(pcnf->rules, host, buf); LOG("Processed query: %s", query); @@ -973,8 +973,11 @@ void ws_confirm_request(void *client, const char *key) char rkey[128]; char sha_d[20]; char base64[64]; - strncpy(rkey, key, 128); - strcat(rkey, WS_MAGIC_STRING); + strncpy(rkey, key, sizeof(rkey)-1); + int n = (int)sizeof(rkey) - (int)strlen(key); + if (n < 0) + n = 0; + strncat(rkey, WS_MAGIC_STRING, n); //printf("RESPONDKEY '%s'\n", rkey); #ifdef USE_OPENSSL SHA_CTX context; diff --git a/httpd.c b/httpd.c index 885006b..3d1ff0a 100644 --- a/httpd.c +++ b/httpd.c @@ -21,7 +21,7 @@ snprintf(buff, BUFFLEN, ##__VA_ARGS__); \ ret = write(fd, buff, strlen(buff)); -static antd_scheduler_t scheduler; +static antd_scheduler_t* scheduler; #ifdef USE_OPENSSL @@ -148,7 +148,7 @@ static void stop_serve(int dummy) sigaddset(&mask, SIGPIPE); sigaddset(&mask, SIGABRT); sigprocmask(SIG_BLOCK, &mask, NULL); - antd_scheduler_destroy(&scheduler); + antd_scheduler_destroy(scheduler); unload_all_plugin(); #ifdef USE_OPENSSL FIPS_mode_set(0); @@ -236,18 +236,18 @@ static void antd_monitor(port_config_t *pcnf) }*/ } #endif - pthread_mutex_lock(&scheduler.scheduler_lock); + antd_scheduler_lock(scheduler); conf->connection++; - pthread_mutex_unlock(&scheduler.scheduler_lock); + antd_scheduler_unlock(scheduler); // create callback for the server task = antd_create_task(accept_request, (void *)request, finish_request, client->last_io); //task->type = LIGHT; - antd_add_task(&scheduler, task); + antd_scheduler_add_task(scheduler, task); } } } -static void client_statistic(int fd, void *user_data) +void antd_scheduler_ext_statistic(int fd, void *user_data) { antd_request_t *request = (antd_request_t *)user_data; chain_t it, it1; @@ -295,6 +295,19 @@ static void client_statistic(int fd, void *user_data) UNUSED(ret); } +void antd_scheduler_destroy_data(void *data) +{ + finish_request(data); +} + +int antd_task_data_id(void *data) +{ + antd_request_t *rq = (antd_request_t *)data; + if(!rq) + return 0; + return antd_scheduler_next_id(scheduler,rq->client->sock); +} + int main(int argc, char *argv[]) { pthread_t sched_th; @@ -333,11 +346,8 @@ int main(int argc, char *argv[]) #endif // enable scheduler // default to 4 workers - scheduler.validate_data = 1; - scheduler.destroy_data = finish_request; - strncpy(scheduler.stat_fifo, conf->stat_fifo_path, MAX_FIFO_NAME_SZ); - scheduler.stat_data_cb = client_statistic; - if (antd_scheduler_init(&scheduler, conf->n_workers) == -1) + scheduler = antd_scheduler_init( conf->n_workers, conf->stat_fifo_path); + if (scheduler == NULL) { ERROR("Unable to initialise scheduler. Exit"); stop_serve(0); @@ -371,7 +381,7 @@ int main(int argc, char *argv[]) exit(1); } // Start scheduler - if (pthread_create(&sched_th, NULL, (void *(*)(void *))antd_wait, (void *)&scheduler) != 0) + if (pthread_create(&sched_th, NULL, (void *(*)(void *))antd_scheduler_wait, (void *)scheduler) != 0) { ERROR("pthread_create: cannot start scheduler thread"); stop_serve(0); @@ -383,7 +393,7 @@ int main(int argc, char *argv[]) pthread_detach(sched_th); } - while (scheduler.status) + while (antd_scheduler_ok(scheduler)) { if (conf->connection > conf->maxcon) { diff --git a/lib/bst.c b/lib/bst.c index c6d0b63..241c9f7 100644 --- a/lib/bst.c +++ b/lib/bst.c @@ -3,12 +3,16 @@ #include "bst.h" -void bst_free(bst_node_t* root) +void bst_free_cb(bst_node_t* root, void (*cb)(void*)) { if(root != NULL) { bst_free(root->left); bst_free(root->right); + if(root->data && cb) + { + cb(root->data); + } free(root); } } diff --git a/lib/bst.h b/lib/bst.h index f8c5be5..8d13db6 100644 --- a/lib/bst.h +++ b/lib/bst.h @@ -1,6 +1,6 @@ #ifndef BST_H #define BST_H 1 - +#define bst_free(n) (bst_free_cb(n, NULL)) typedef struct _tree_node { int key; @@ -9,7 +9,7 @@ typedef struct _tree_node struct _tree_node* right; } bst_node_t; -void bst_free(bst_node_t* root); +void bst_free_cb(bst_node_t* root, void (*callback)(void*)); bst_node_t* bst_insert(bst_node_t* root, int key, void* data); bst_node_t* bst_find_min(bst_node_t* root); bst_node_t* bst_find_max(bst_node_t* root); diff --git a/lib/scheduler.c b/lib/scheduler.c index 762f32e..7d0f399 100644 --- a/lib/scheduler.c +++ b/lib/scheduler.c @@ -7,6 +7,70 @@ #include #include "scheduler.h" #include "utils.h" +#include "bst.h" + +#define MAX_VALIDITY_INTERVAL 20 +#define MAX_FIFO_NAME_SZ 255 + +// callback definition +struct _antd_callback_t +{ + void *(*handle)(void *); + struct _antd_callback_t *next; +}; + +typedef struct { + int type; + int fd; +} antd_scheduler_evt_item_t; + +struct _antd_queue_item_t +{ + union + { + antd_scheduler_evt_item_t* evt; + antd_task_t *task; + void * raw_ptr; + }; + struct _antd_queue_item_t *next; +}; + +typedef struct _antd_queue_item_t* antd_queue_item_t; + +typedef antd_queue_item_t antd_queue_t; + +typedef struct +{ + int id; + pthread_t tid; + void *manager; +} antd_worker_t; + +struct _antd_scheduler_t +{ + // data lock + pthread_mutex_t scheduler_lock; + pthread_mutex_t worker_lock; + pthread_mutex_t pending_lock; + // event handle + sem_t *scheduler_sem; + sem_t *worker_sem; + // worker and data + bst_node_t *task_queue; + antd_queue_t workers_queue; + uint8_t status; // 0 stop, 1 working + antd_worker_t *workers; + int n_workers; + int pending_task; + int id_allocator; + char stat_fifo[MAX_FIFO_NAME_SZ]; + int stat_fd; + pthread_t stat_tid; +}; + +static antd_callback_t *callback_of(void *(*callback)(void *)); +static void antd_execute_task(antd_scheduler_t *, antd_task_t *); +static int antd_task_schedule(antd_scheduler_t *); static void set_nonblock(int fd) { @@ -19,21 +83,21 @@ static void set_nonblock(int fd) fcntl(fd, F_SETFL, flags | O_NONBLOCK); } -static void enqueue(antd_task_queue_t *q, antd_task_t *task) +static void enqueue(antd_queue_t *q, void *data) { - antd_task_item_t it = *q; + antd_queue_item_t it = *q; while (it && it->next != NULL) it = it->next; - antd_task_item_t taski = (antd_task_item_t)malloc(sizeof *taski); - taski->task = task; - taski->next = NULL; + antd_queue_item_t new_it = (antd_queue_item_t)malloc(sizeof *new_it); + new_it->raw_ptr = data; + new_it->next = NULL; if (!it) // first task { - *q = taski; + *q = new_it; } else { - it->next = taski; + it->next = new_it; } } @@ -61,9 +125,9 @@ static void stop(antd_scheduler_t *scheduler) sem_close(scheduler->worker_sem); } -static antd_task_item_t dequeue(antd_task_queue_t *q) +static antd_queue_item_t dequeue(antd_queue_t *q) { - antd_task_item_t it = *q; + antd_queue_item_t it = *q; if (it) { *q = it->next; @@ -72,7 +136,7 @@ static antd_task_item_t dequeue(antd_task_queue_t *q) return it; } -antd_callback_t *callback_of(void *(*callback)(void *)) +static antd_callback_t *callback_of(void *(*callback)(void *)) { antd_callback_t *cb = NULL; if (callback) @@ -114,13 +178,8 @@ static void execute_callback(antd_scheduler_t *scheduler, antd_task_t *task) // call the first come call back task->handle = cb->handle; task->callback = task->callback->next; - task->priority = task->priority + 1; - if (task->priority > N_PRIORITY - 1) - { - task->priority = N_PRIORITY - 1; - } free(cb); - antd_add_task(scheduler, task); + antd_scheduler_add_task(scheduler, task); } else { @@ -128,17 +187,30 @@ static void execute_callback(antd_scheduler_t *scheduler, antd_task_t *task) } } -static void destroy_queue(antd_task_queue_t q) +static void destroy_queue(antd_queue_t q, int is_task) { - antd_task_item_t it, curr; + antd_queue_item_t it, curr; it = q; while (it) { - // first free the task - if (it->task && it->task->callback) - free_callback(it->task->callback); - if (it->task) - free(it->task); + if(is_task) + { + // first free the task + if (it->task && it->task->callback) + { + free_callback(it->task->callback); + it->task->callback = NULL; + } + if (it->task) + free(it->task); + } + else + { + if(it->raw_ptr) + { + free(it->raw_ptr); + } + } // then free the placeholder curr = it; it = it->next; @@ -150,7 +222,7 @@ static void *work(antd_worker_t *worker) antd_scheduler_t *scheduler = (antd_scheduler_t *)worker->manager; while (scheduler->status) { - antd_task_item_t it; + antd_queue_item_t it; pthread_mutex_lock(&scheduler->worker_lock); it = dequeue(&scheduler->workers_queue); pthread_mutex_unlock(&scheduler->worker_lock); @@ -165,18 +237,57 @@ static void *work(antd_worker_t *worker) else { //LOG("task executed by worker %d\n", worker->id); - antd_execute_task(scheduler, it); + antd_execute_task(scheduler, it->task); + free(it); } } return NULL; } +static void print_static_info(bst_node_t *node, void **args, int argc) +{ + if (argc != 2) + { + return; + } + int ret; + char *buffer = args[0]; + int *fdp = args[1]; + antd_task_t *task = (antd_task_t *)node->data; + // send statistic on task data + snprintf(buffer, MAX_FIFO_NAME_SZ, "---- Task %d created at: %lu ----\n", task->id, task->stamp); + ret = write(*fdp, buffer, strlen(buffer)); + // send statistic on task data + snprintf(buffer, MAX_FIFO_NAME_SZ, "Access time: %lu\nn", (unsigned long)task->access_time); + ret = write(*fdp, buffer, strlen(buffer)); + + snprintf(buffer, MAX_FIFO_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL)); + ret = write(*fdp, buffer, strlen(buffer)); + + snprintf(buffer, MAX_FIFO_NAME_SZ, "Task type: %d\n", task->type); + ret = write(*fdp, buffer, strlen(buffer)); + + if (task->handle) + { + snprintf(buffer, MAX_FIFO_NAME_SZ, "Has handle: yes\n"); + ret = write(*fdp, buffer, strlen(buffer)); + } + + if (task->callback) + { + snprintf(buffer, MAX_FIFO_NAME_SZ, "Has callback: yes\n"); + ret = write(*fdp, buffer, strlen(buffer)); + } + UNUSED(ret); + // now print all task data statistic + antd_scheduler_ext_statistic(*fdp, task->data); +} static void *statistic(antd_scheduler_t *scheduler) { fd_set fd_out; int ret; char buffer[MAX_FIFO_NAME_SZ]; - antd_task_item_t it; + void *argc[2]; while (scheduler->status) { if (scheduler->stat_fd == -1) @@ -192,6 +303,8 @@ static void *statistic(antd_scheduler_t *scheduler) set_nonblock(scheduler->stat_fd); } } + argc[0] = buffer; + argc[1] = &scheduler->stat_fd; FD_ZERO(&fd_out); FD_SET(scheduler->stat_fd, &fd_out); ret = select(scheduler->stat_fd + 1, NULL, &fd_out, NULL, NULL); @@ -215,48 +328,8 @@ static void *statistic(antd_scheduler_t *scheduler) snprintf(buffer, MAX_FIFO_NAME_SZ, "Pending task: %d. Detail:\n", scheduler->pending_task); ret = write(scheduler->stat_fd, buffer, strlen(buffer)); - for (int i = 0; i < N_PRIORITY; i++) - { - snprintf(buffer, MAX_FIFO_NAME_SZ, "#### PRIORITY: %d\n", i); - ret = write(scheduler->stat_fd, buffer, strlen(buffer)); + bst_for_each(scheduler->task_queue, print_static_info, argc, 2); - it = scheduler->task_queue[i]; - while (it) - { - // send statistic on task data - snprintf(buffer, MAX_FIFO_NAME_SZ, "---- Task created at: %lu ----\n", it->task->stamp); - ret = write(scheduler->stat_fd, buffer, strlen(buffer)); - - // send statistic on task data - snprintf(buffer, MAX_FIFO_NAME_SZ, "Access time: %lu\nn", (unsigned long)it->task->access_time); - ret = write(scheduler->stat_fd, buffer, strlen(buffer)); - - snprintf(buffer, MAX_FIFO_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL)); - ret = write(scheduler->stat_fd, buffer, strlen(buffer)); - - snprintf(buffer, MAX_FIFO_NAME_SZ, "Task type: %d\n", it->task->type); - ret = write(scheduler->stat_fd, buffer, strlen(buffer)); - - if (it->task->handle) - { - snprintf(buffer, MAX_FIFO_NAME_SZ, "Has handle: yes\n"); - ret = write(scheduler->stat_fd, buffer, strlen(buffer)); - } - - if (it->task->callback) - { - snprintf(buffer, MAX_FIFO_NAME_SZ, "Has callback: yes\n"); - ret = write(scheduler->stat_fd, buffer, strlen(buffer)); - } - - // now print all task data statistic - if (scheduler->stat_data_cb) - { - scheduler->stat_data_cb(scheduler->stat_fd, it->task->data); - } - it = it->next; - } - } pthread_mutex_unlock(&scheduler->scheduler_lock); ret = close(scheduler->stat_fd); scheduler->stat_fd = -1; @@ -265,7 +338,7 @@ static void *statistic(antd_scheduler_t *scheduler) else { ret = write(scheduler->stat_fd, ".", 1); - if(ret == -1) + if (ret == -1) { ret = close(scheduler->stat_fd); scheduler->stat_fd = -1; @@ -279,8 +352,8 @@ static void *statistic(antd_scheduler_t *scheduler) } else { - ret = close(scheduler->stat_fd); - scheduler->stat_fd = -1; + ret = close(scheduler->stat_fd); + scheduler->stat_fd = -1; } break; } @@ -302,36 +375,40 @@ static void *statistic(antd_scheduler_t *scheduler) init the main scheduler */ -int antd_scheduler_init(antd_scheduler_t *scheduler, int n) +antd_scheduler_t *antd_scheduler_init(int n, const char *stat_name) { + antd_scheduler_t *scheduler = (antd_scheduler_t *)malloc(sizeof(antd_scheduler_t)); scheduler->n_workers = n; scheduler->status = 1; scheduler->workers_queue = NULL; scheduler->pending_task = 0; - // scheduler->validate_data = 0; - // scheduler->destroy_data = NULL; scheduler->stat_fd = -1; - //scheduler->stat_data_cb = NULL; - //memset(scheduler->stat_fifo, 0, MAX_FIFO_NAME_SZ); + scheduler->id_allocator = 0; + (void)memset(scheduler->stat_fifo, 0, MAX_FIFO_NAME_SZ); + if (stat_name) + { + (void)strncpy(scheduler->stat_fifo, stat_name, MAX_FIFO_NAME_SZ - 1); + } // init semaphore scheduler->scheduler_sem = sem_open("scheduler", O_CREAT, 0600, 0); if (scheduler->scheduler_sem == SEM_FAILED) { ERROR("Cannot open semaphore for scheduler"); - return -1; + free(scheduler); + return NULL; } scheduler->worker_sem = sem_open("worker", O_CREAT, 0600, 0); if (!scheduler->worker_sem) { ERROR("Cannot open semaphore for workers"); - return -1; + free(scheduler); + return NULL; } // init lock pthread_mutex_init(&scheduler->scheduler_lock, NULL); pthread_mutex_init(&scheduler->worker_lock, NULL); pthread_mutex_init(&scheduler->pending_lock, NULL); - for (int i = 0; i < N_PRIORITY; i++) - scheduler->task_queue[i] = NULL; + scheduler->task_queue = NULL; // create scheduler.workers if (n > 0) { @@ -339,7 +416,8 @@ int antd_scheduler_init(antd_scheduler_t *scheduler, int n) if (!scheduler->workers) { ERROR("Cannot allocate memory for worker"); - return -1; + free(scheduler); + return NULL; } for (int i = 0; i < scheduler->n_workers; i++) { @@ -348,7 +426,8 @@ int antd_scheduler_init(antd_scheduler_t *scheduler, int n) if (pthread_create(&scheduler->workers[i].tid, NULL, (void *(*)(void *))work, (void *)&scheduler->workers[i]) != 0) { ERROR("pthread_create: cannot create worker: %s", strerror(errno)); - return -1; + free(scheduler); + return NULL; } else { @@ -375,37 +454,47 @@ int antd_scheduler_init(antd_scheduler_t *scheduler, int n) } } LOG("Antd scheduler initialized with %d worker", scheduler->n_workers); - return 0; + return scheduler; } + +static void destroy_task(void *data) +{ + antd_task_t *task = (antd_task_t *)data; + if (task && task->callback) + free_callback(task->callback); + if (task) + free(task); +} + /* destroy all pending task pthread_mutex_lock(&scheduler.queue_lock); */ void antd_scheduler_destroy(antd_scheduler_t *scheduler) { + if (!scheduler) + return; // free all the chains stop(scheduler); LOG("Destroy remaining queue"); - for (int i = 0; i < N_PRIORITY; i++) - { - destroy_queue(scheduler->task_queue[i]); - } - destroy_queue(scheduler->workers_queue); + bst_free_cb(scheduler->task_queue, destroy_task); + scheduler->task_queue = NULL; + destroy_queue(scheduler->workers_queue,1); + free(scheduler); } /* create a task */ -antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t atime) +antd_task_t *antd_mktask(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t atime, antd_task_type_t type) { antd_task_t *task = (antd_task_t *)malloc(sizeof *task); task->stamp = (unsigned long)time(NULL); task->data = data; task->handle = handle; + task->id = antd_task_data_id(data); task->callback = callback_of(callback); - task->priority = HIGH_PRIORITY; - task->type = HEAVY; - //task->type = LIGHT; + task->type = type; task->access_time = atime; return task; } @@ -413,13 +502,12 @@ antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callb /* scheduling a task */ -void antd_add_task(antd_scheduler_t *scheduler, antd_task_t *task) +void antd_scheduler_add_task(antd_scheduler_t *scheduler, antd_task_t *task) { - // check if task is exist - int prio = task->priority > N_PRIORITY - 1 ? N_PRIORITY - 1 : task->priority; - //LOG("Prio is %d\n", prio); + if(task->id == 0) + task->id = antd_scheduler_next_id(scheduler, task->id); pthread_mutex_lock(&scheduler->scheduler_lock); - enqueue(&scheduler->task_queue[prio], task); + scheduler->task_queue = bst_insert(scheduler->task_queue, task->id, (void *)task); pthread_mutex_unlock(&scheduler->scheduler_lock); pthread_mutex_lock(&scheduler->pending_lock); scheduler->pending_task++; @@ -428,50 +516,42 @@ void antd_add_task(antd_scheduler_t *scheduler, antd_task_t *task) sem_post(scheduler->scheduler_sem); } -void antd_execute_task(antd_scheduler_t *scheduler, antd_task_item_t taski) +static void antd_execute_task(antd_scheduler_t *scheduler, antd_task_t *task) { - if (!taski) + if (!task) return; // execute the task - void *ret = (*(taski->task->handle))(taski->task->data); + void *ret = (*(task->handle))(task->data); // check the return data if it is a new task if (!ret) { // call the first callback - execute_callback(scheduler, taski->task); - free(taski); + execute_callback(scheduler, task); } else { antd_task_t *rtask = (antd_task_t *)ret; - if (taski->task->callback) + if (task->callback) { if (rtask->callback) { - enqueue_callback(rtask->callback, taski->task->callback); + enqueue_callback(rtask->callback, task->callback); } else { - rtask->callback = taski->task->callback; + rtask->callback = task->callback; } } if (!rtask->handle) { // call the first callback execute_callback(scheduler, rtask); - free(taski->task); - free(taski); + free(task); } else { - rtask->priority = taski->task->priority + 1; - if (rtask->priority > N_PRIORITY - 1) - { - rtask->priority = N_PRIORITY - 1; - } - antd_add_task(scheduler, rtask); - free(taski->task); - free(taski); + antd_scheduler_add_task(scheduler, rtask); + free(task); } } } @@ -481,21 +561,31 @@ int antd_scheduler_busy(antd_scheduler_t *scheduler) return scheduler->pending_task != 0; } -int antd_task_schedule(antd_scheduler_t *scheduler) +void antd_scheduler_lock(antd_scheduler_t *sched) +{ + pthread_mutex_lock(&sched->scheduler_lock); +} + +void antd_scheduler_unlock(antd_scheduler_t *sched) +{ + pthread_mutex_unlock(&sched->scheduler_lock); +} + +static int antd_task_schedule(antd_scheduler_t *scheduler) { // fetch next task from the task_queue - antd_task_item_t it = NULL; + antd_task_t *task = NULL; + bst_node_t *node; pthread_mutex_lock(&scheduler->scheduler_lock); - for (int i = 0; i < N_PRIORITY; i++) + node = bst_find_min(scheduler->task_queue); + if (node) { - - it = dequeue(&scheduler->task_queue[i]); - if (it) - break; + task = (antd_task_t *)node->data; + scheduler->task_queue = bst_delete(scheduler->task_queue, node->key); } pthread_mutex_unlock(&scheduler->scheduler_lock); // no task - if (!it) + if (!task) { return 0; } @@ -504,41 +594,44 @@ int antd_task_schedule(antd_scheduler_t *scheduler) pthread_mutex_unlock(&scheduler->pending_lock); // has the task now // validate the task - if (scheduler->validate_data && difftime(time(NULL), it->task->access_time) > MAX_VALIDITY_INTERVAL && it->task->priority == N_PRIORITY - 1) + //if (scheduler->validate_data && difftime(time(NULL), it->task->access_time) > MAX_VALIDITY_INTERVAL && it->task->priority == N_PRIORITY - 1) + if (antd_scheduler_validate_data(task) == 0) { // data task is not valid LOG("Task is no longer valid and will be killed"); - if (scheduler->destroy_data) - scheduler->destroy_data(it->task->data); - if (it->task->callback) - free_callback(it->task->callback); - free(it->task); - free(it); + antd_scheduler_destroy_data(task->data); + if (task->callback) + { + free_callback(task->callback); + task->callback = NULL; + } + + free(task); return 0; } // check the type of task - if (it->task->type == LIGHT || scheduler->n_workers <= 0) + if (task->type == LIGHT || scheduler->n_workers <= 0) { // do it by myself - antd_execute_task(scheduler, it); + antd_execute_task(scheduler, task); } else { // delegate to other workers by //pushing to the worker queue pthread_mutex_lock(&scheduler->worker_lock); - enqueue(&scheduler->workers_queue, it->task); + enqueue(&scheduler->workers_queue, task); pthread_mutex_unlock(&scheduler->worker_lock); // wake up idle worker sem_post(scheduler->worker_sem); - free(it); } return 1; } -void antd_wait(antd_scheduler_t *scheduler) +void* antd_scheduler_wait(void* ptr) { int stat; + antd_scheduler_t *scheduler = (antd_scheduler_t *) ptr; while (scheduler->status) { stat = antd_task_schedule(scheduler); @@ -548,4 +641,47 @@ void antd_wait(antd_scheduler_t *scheduler) sem_wait(scheduler->scheduler_sem); } } + return NULL; } + +int antd_scheduler_ok(antd_scheduler_t *scheduler) +{ + return scheduler->status; +} +int antd_scheduler_next_id(antd_scheduler_t *sched, int input) +{ + int id = input; + pthread_mutex_lock(&sched->scheduler_lock); + if (id == 0) + { + sched->id_allocator++; + id = sched->id_allocator; + } + + while (bst_find(sched->task_queue, id) != NULL) + { + id = sched->id_allocator; + } + pthread_mutex_unlock(&sched->scheduler_lock); + return id; +} +void antd_scheduler_ext_statistic(int fd, void *data) +{ + UNUSED(fd); + UNUSED(data); +} +int antd_scheduler_validate_data(antd_task_t *task) +{ + return !(difftime(time(NULL), task->access_time) > MAX_VALIDITY_INTERVAL); +} +void antd_scheduler_destroy_data(void *data) +{ + UNUSED(data); +} + +int antd_task_data_id(void *data) +{ + UNUSED(data); + intptr_t ptr = (intptr_t)data; + return (int)ptr; +} \ No newline at end of file diff --git a/lib/scheduler.h b/lib/scheduler.h index 3f53b31..7b77ad7 100644 --- a/lib/scheduler.h +++ b/lib/scheduler.h @@ -4,143 +4,113 @@ #include #include #include -#define N_PRIORITY 10 -#define NORMAL_PRIORITY ((int)((N_PRIORITY - 1) / 2)) -#define LOW_PRIORITY (N_PRIORITY - 1) -#define HIGH_PRIORITY 0 -#define MAX_VALIDITY_INTERVAL 20 // 10 s for task validity -#define MAX_FIFO_NAME_SZ 255 + +#define antd_create_task(h, d, c, t) (antd_mktask(h, d, c, t, HEAVY)) + typedef enum { LIGHT, HEAVY } antd_task_type_t; -// callback definition -typedef struct __callback_t -{ - void *(*handle)(void *); - struct __callback_t *next; -} antd_callback_t; -// task definition + +typedef struct _antd_scheduler_t antd_scheduler_t; +typedef struct _antd_callback_t antd_callback_t; +typedef struct _antd_queue_item_t antd_scheduler_evt_t; typedef struct { - /* - creation time of a task + /** + * task id + */ + int id; + /** + * creation time of a task */ unsigned long stamp; - /* - Last access time of - task data + /** + * Last access time of + * task data */ time_t access_time; - /* - priority from 0 to N_PRIORITY - 1 - higher value is lower priority - */ - uint8_t priority; - /* - the callback + /** + * the handle and callback */ void *(*handle)(void *); antd_callback_t *callback; - /* - user data if any - */ + /** + * The task events + * each task must be binded to + * one or more event, otherwise it will be + * rejected by the scheduler + * */ + antd_scheduler_evt_t* events; + /** + * user data if any + */ void *data; - /* - type of a task - light tasks are executed directly - heavy tasks are delegated to workers + /** + * type of a task + * light tasks are executed directly + * heavy tasks are delegated to workers */ antd_task_type_t type; } antd_task_t; - -typedef struct __task_item_t -{ - antd_task_t *task; - struct __task_item_t *next; -} * antd_task_item_t; - -typedef antd_task_item_t antd_task_queue_t; - -typedef struct -{ - int id; - pthread_t tid; - void *manager; -} antd_worker_t; - -typedef struct -{ - // data lock - pthread_mutex_t scheduler_lock; - pthread_mutex_t worker_lock; - pthread_mutex_t pending_lock; - // event handle - sem_t *scheduler_sem; - sem_t *worker_sem; - // worker and data - antd_task_queue_t task_queue[N_PRIORITY]; - antd_task_queue_t workers_queue; - uint8_t status; // 0 stop, 1 working - antd_worker_t *workers; - int n_workers; - int pending_task; - /* - function pointer that free data in a task if - the task is not valid - default to NULL - */ - void* (*destroy_data)(void*); - int validate_data; - /** - * statistic infomation - */ - char stat_fifo[MAX_FIFO_NAME_SZ]; - int stat_fd; - pthread_t stat_tid; - void (*stat_data_cb)(int, void *); -} antd_scheduler_t; - /* - init the main scheduler +* nit the main scheduler */ -int antd_scheduler_init(antd_scheduler_t *, int); +antd_scheduler_t *antd_scheduler_init(int, const char *stat_name); /* - destroy all pending task +* destroy all pending task */ void antd_scheduler_destroy(antd_scheduler_t *); -/* - create a task - parameter: - - handle - - data - - callback - - last data access time +/** +* create a task +* parameter: +* - handle +* - data +* - callback +* - last data access time */ -antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t); +antd_task_t *antd_mktask(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t, antd_task_type_t type); -/* - add a task +/** +* add a task */ -void antd_add_task(antd_scheduler_t *, antd_task_t *); -/* - execute and free a task a task -*/ -void antd_execute_task(antd_scheduler_t *, antd_task_item_t); -/* - scheduler status +void antd_scheduler_add_task(antd_scheduler_t *, antd_task_t *); + +/** +* check if scheduler is busy */ int antd_scheduler_busy(antd_scheduler_t *); -/* - schedule a task +/** + * get scheduler status + * */ +int antd_scheduler_ok(antd_scheduler_t *scheduler); +/** +* +* wait for event */ -int antd_task_schedule(antd_scheduler_t *); -/* -wait for event -*/ -void antd_wait(antd_scheduler_t *); +void* antd_scheduler_wait(void *); -antd_callback_t* callback_of( void* (*callback)(void*) ); +/** + * lock the scheduler + * */ +void antd_scheduler_lock(antd_scheduler_t *); +/** + * Get next valid task id + * */ +int antd_scheduler_next_id(antd_scheduler_t* sched, int input); +/** + * unlock the scheduler + * */ +void antd_scheduler_unlock(antd_scheduler_t *); + +/** + * weak functions that should be overridden by the application + * that user the scheduler as library +*/ +void __attribute__((weak)) antd_scheduler_ext_statistic(int fd, void *data); +int __attribute__((weak)) antd_scheduler_validate_data(antd_task_t *task); +void __attribute__((weak)) antd_scheduler_destroy_data(void *data); +int __attribute__((weak)) antd_task_data_id(void *data); #endif \ No newline at end of file diff --git a/lib/utils.c b/lib/utils.c index ca0d56c..8770d2f 100644 --- a/lib/utils.c +++ b/lib/utils.c @@ -545,4 +545,53 @@ int mkdirp(const char* path, mode_t mode) } } return mkdir(path, mode); +} + + +int guard_read(int fd, void* buffer, size_t size) +{ + int n = 0; + int read_len; + int st; + while(n != (int)size) + { + read_len = (int)size - n; + st = read(fd,buffer + n,read_len); + if(st == -1) + { + ERROR( "Unable to read from #%d: %s", fd, strerror(errno)); + return -1; + } + if(st == 0) + { + ERROR("Endpoint %d is closed", fd); + return -1; + } + n += st; + } + return n; +} + +int guard_write(int fd, void* buffer, size_t size) +{ + int n = 0; + int write_len; + int st; + while(n != (int)size) + { + write_len = (int)size - n; + st = write(fd,buffer + n,write_len); + if(st == -1) + { + ERROR("Unable to write to #%d: %s", fd, strerror(errno)); + return -1; + } + if(st == 0) + { + ERROR("Endpoint %d is closed", fd); + return -1; + } + n += st; + } + return n; } \ No newline at end of file diff --git a/lib/utils.h b/lib/utils.h index ca7f3d1..de97369 100644 --- a/lib/utils.h +++ b/lib/utils.h @@ -94,4 +94,6 @@ void md5(uint8_t *, size_t , char*); void sha1(const char*, char*); void digest_to_hex(const uint8_t *, char *); void verify_header(char* k); +int guard_read(int fd, void* buffer, size_t size); +int guard_write(int fd, void* buffer, size_t size); #endif