1
0
mirror of https://github.com/lxsang/ant-http synced 2024-07-03 13:39:46 +02:00

improvement scheduler

This commit is contained in:
lxsang 2021-01-03 11:24:55 +01:00
parent 9d19a81a8e
commit cbbc48d216
8 changed files with 436 additions and 262 deletions

View File

@ -679,7 +679,7 @@ int startup(unsigned *port)
ERROR("Port %d - socket: %s", *port, strerror(errno)); ERROR("Port %d - socket: %s", *port, strerror(errno));
return -1; return -1;
} }
if (setsockopt(httpd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) == -1) if (setsockopt(httpd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) == -1)
{ {
ERROR("Unable to set reuse address on port %d - setsockopt: %s", *port, strerror(errno)); ERROR("Unable to set reuse address on port %d - setsockopt: %s", *port, strerror(errno));
@ -848,7 +848,7 @@ void *decode_request_header(void *data)
#endif #endif
//if(line) free(line); //if(line) free(line);
memset(buf, 0, sizeof(buf)); memset(buf, 0, sizeof(buf));
strcat(buf, url); strncat(buf, url, sizeof(buf) - 1);
LOG("Original query (%d): %s", rq->client->sock, url); LOG("Original query (%d): %s", rq->client->sock, url);
query = apply_rules(pcnf->rules, host, buf); query = apply_rules(pcnf->rules, host, buf);
LOG("Processed query: %s", query); LOG("Processed query: %s", query);
@ -973,8 +973,11 @@ void ws_confirm_request(void *client, const char *key)
char rkey[128]; char rkey[128];
char sha_d[20]; char sha_d[20];
char base64[64]; char base64[64];
strncpy(rkey, key, 128); strncpy(rkey, key, sizeof(rkey)-1);
strcat(rkey, WS_MAGIC_STRING); int n = (int)sizeof(rkey) - (int)strlen(key);
if (n < 0)
n = 0;
strncat(rkey, WS_MAGIC_STRING, n);
//printf("RESPONDKEY '%s'\n", rkey); //printf("RESPONDKEY '%s'\n", rkey);
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
SHA_CTX context; SHA_CTX context;

36
httpd.c
View File

@ -21,7 +21,7 @@
snprintf(buff, BUFFLEN, ##__VA_ARGS__); \ snprintf(buff, BUFFLEN, ##__VA_ARGS__); \
ret = write(fd, buff, strlen(buff)); ret = write(fd, buff, strlen(buff));
static antd_scheduler_t scheduler; static antd_scheduler_t* scheduler;
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
@ -148,7 +148,7 @@ static void stop_serve(int dummy)
sigaddset(&mask, SIGPIPE); sigaddset(&mask, SIGPIPE);
sigaddset(&mask, SIGABRT); sigaddset(&mask, SIGABRT);
sigprocmask(SIG_BLOCK, &mask, NULL); sigprocmask(SIG_BLOCK, &mask, NULL);
antd_scheduler_destroy(&scheduler); antd_scheduler_destroy(scheduler);
unload_all_plugin(); unload_all_plugin();
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
FIPS_mode_set(0); FIPS_mode_set(0);
@ -236,18 +236,18 @@ static void antd_monitor(port_config_t *pcnf)
}*/ }*/
} }
#endif #endif
pthread_mutex_lock(&scheduler.scheduler_lock); antd_scheduler_lock(scheduler);
conf->connection++; conf->connection++;
pthread_mutex_unlock(&scheduler.scheduler_lock); antd_scheduler_unlock(scheduler);
// create callback for the server // create callback for the server
task = antd_create_task(accept_request, (void *)request, finish_request, client->last_io); task = antd_create_task(accept_request, (void *)request, finish_request, client->last_io);
//task->type = LIGHT; //task->type = LIGHT;
antd_add_task(&scheduler, task); antd_scheduler_add_task(scheduler, task);
} }
} }
} }
static void client_statistic(int fd, void *user_data) void antd_scheduler_ext_statistic(int fd, void *user_data)
{ {
antd_request_t *request = (antd_request_t *)user_data; antd_request_t *request = (antd_request_t *)user_data;
chain_t it, it1; chain_t it, it1;
@ -295,6 +295,19 @@ static void client_statistic(int fd, void *user_data)
UNUSED(ret); UNUSED(ret);
} }
void antd_scheduler_destroy_data(void *data)
{
finish_request(data);
}
int antd_task_data_id(void *data)
{
antd_request_t *rq = (antd_request_t *)data;
if(!rq)
return 0;
return antd_scheduler_next_id(scheduler,rq->client->sock);
}
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
pthread_t sched_th; pthread_t sched_th;
@ -333,11 +346,8 @@ int main(int argc, char *argv[])
#endif #endif
// enable scheduler // enable scheduler
// default to 4 workers // default to 4 workers
scheduler.validate_data = 1; scheduler = antd_scheduler_init( conf->n_workers, conf->stat_fifo_path);
scheduler.destroy_data = finish_request; if (scheduler == NULL)
strncpy(scheduler.stat_fifo, conf->stat_fifo_path, MAX_FIFO_NAME_SZ);
scheduler.stat_data_cb = client_statistic;
if (antd_scheduler_init(&scheduler, conf->n_workers) == -1)
{ {
ERROR("Unable to initialise scheduler. Exit"); ERROR("Unable to initialise scheduler. Exit");
stop_serve(0); stop_serve(0);
@ -371,7 +381,7 @@ int main(int argc, char *argv[])
exit(1); exit(1);
} }
// Start scheduler // Start scheduler
if (pthread_create(&sched_th, NULL, (void *(*)(void *))antd_wait, (void *)&scheduler) != 0) if (pthread_create(&sched_th, NULL, (void *(*)(void *))antd_scheduler_wait, (void *)scheduler) != 0)
{ {
ERROR("pthread_create: cannot start scheduler thread"); ERROR("pthread_create: cannot start scheduler thread");
stop_serve(0); stop_serve(0);
@ -383,7 +393,7 @@ int main(int argc, char *argv[])
pthread_detach(sched_th); pthread_detach(sched_th);
} }
while (scheduler.status) while (antd_scheduler_ok(scheduler))
{ {
if (conf->connection > conf->maxcon) if (conf->connection > conf->maxcon)
{ {

View File

@ -3,12 +3,16 @@
#include "bst.h" #include "bst.h"
void bst_free(bst_node_t* root) void bst_free_cb(bst_node_t* root, void (*cb)(void*))
{ {
if(root != NULL) if(root != NULL)
{ {
bst_free(root->left); bst_free(root->left);
bst_free(root->right); bst_free(root->right);
if(root->data && cb)
{
cb(root->data);
}
free(root); free(root);
} }
} }

View File

@ -1,6 +1,6 @@
#ifndef BST_H #ifndef BST_H
#define BST_H 1 #define BST_H 1
#define bst_free(n) (bst_free_cb(n, NULL))
typedef struct _tree_node typedef struct _tree_node
{ {
int key; int key;
@ -9,7 +9,7 @@ typedef struct _tree_node
struct _tree_node* right; struct _tree_node* right;
} bst_node_t; } bst_node_t;
void bst_free(bst_node_t* root); void bst_free_cb(bst_node_t* root, void (*callback)(void*));
bst_node_t* bst_insert(bst_node_t* root, int key, void* data); bst_node_t* bst_insert(bst_node_t* root, int key, void* data);
bst_node_t* bst_find_min(bst_node_t* root); bst_node_t* bst_find_min(bst_node_t* root);
bst_node_t* bst_find_max(bst_node_t* root); bst_node_t* bst_find_max(bst_node_t* root);

View File

@ -7,6 +7,70 @@
#include <unistd.h> #include <unistd.h>
#include "scheduler.h" #include "scheduler.h"
#include "utils.h" #include "utils.h"
#include "bst.h"
#define MAX_VALIDITY_INTERVAL 20
#define MAX_FIFO_NAME_SZ 255
// callback definition
struct _antd_callback_t
{
void *(*handle)(void *);
struct _antd_callback_t *next;
};
typedef struct {
int type;
int fd;
} antd_scheduler_evt_item_t;
struct _antd_queue_item_t
{
union
{
antd_scheduler_evt_item_t* evt;
antd_task_t *task;
void * raw_ptr;
};
struct _antd_queue_item_t *next;
};
typedef struct _antd_queue_item_t* antd_queue_item_t;
typedef antd_queue_item_t antd_queue_t;
typedef struct
{
int id;
pthread_t tid;
void *manager;
} antd_worker_t;
struct _antd_scheduler_t
{
// data lock
pthread_mutex_t scheduler_lock;
pthread_mutex_t worker_lock;
pthread_mutex_t pending_lock;
// event handle
sem_t *scheduler_sem;
sem_t *worker_sem;
// worker and data
bst_node_t *task_queue;
antd_queue_t workers_queue;
uint8_t status; // 0 stop, 1 working
antd_worker_t *workers;
int n_workers;
int pending_task;
int id_allocator;
char stat_fifo[MAX_FIFO_NAME_SZ];
int stat_fd;
pthread_t stat_tid;
};
static antd_callback_t *callback_of(void *(*callback)(void *));
static void antd_execute_task(antd_scheduler_t *, antd_task_t *);
static int antd_task_schedule(antd_scheduler_t *);
static void set_nonblock(int fd) static void set_nonblock(int fd)
{ {
@ -19,21 +83,21 @@ static void set_nonblock(int fd)
fcntl(fd, F_SETFL, flags | O_NONBLOCK); fcntl(fd, F_SETFL, flags | O_NONBLOCK);
} }
static void enqueue(antd_task_queue_t *q, antd_task_t *task) static void enqueue(antd_queue_t *q, void *data)
{ {
antd_task_item_t it = *q; antd_queue_item_t it = *q;
while (it && it->next != NULL) while (it && it->next != NULL)
it = it->next; it = it->next;
antd_task_item_t taski = (antd_task_item_t)malloc(sizeof *taski); antd_queue_item_t new_it = (antd_queue_item_t)malloc(sizeof *new_it);
taski->task = task; new_it->raw_ptr = data;
taski->next = NULL; new_it->next = NULL;
if (!it) // first task if (!it) // first task
{ {
*q = taski; *q = new_it;
} }
else else
{ {
it->next = taski; it->next = new_it;
} }
} }
@ -61,9 +125,9 @@ static void stop(antd_scheduler_t *scheduler)
sem_close(scheduler->worker_sem); sem_close(scheduler->worker_sem);
} }
static antd_task_item_t dequeue(antd_task_queue_t *q) static antd_queue_item_t dequeue(antd_queue_t *q)
{ {
antd_task_item_t it = *q; antd_queue_item_t it = *q;
if (it) if (it)
{ {
*q = it->next; *q = it->next;
@ -72,7 +136,7 @@ static antd_task_item_t dequeue(antd_task_queue_t *q)
return it; return it;
} }
antd_callback_t *callback_of(void *(*callback)(void *)) static antd_callback_t *callback_of(void *(*callback)(void *))
{ {
antd_callback_t *cb = NULL; antd_callback_t *cb = NULL;
if (callback) if (callback)
@ -114,13 +178,8 @@ static void execute_callback(antd_scheduler_t *scheduler, antd_task_t *task)
// call the first come call back // call the first come call back
task->handle = cb->handle; task->handle = cb->handle;
task->callback = task->callback->next; task->callback = task->callback->next;
task->priority = task->priority + 1;
if (task->priority > N_PRIORITY - 1)
{
task->priority = N_PRIORITY - 1;
}
free(cb); free(cb);
antd_add_task(scheduler, task); antd_scheduler_add_task(scheduler, task);
} }
else else
{ {
@ -128,17 +187,30 @@ static void execute_callback(antd_scheduler_t *scheduler, antd_task_t *task)
} }
} }
static void destroy_queue(antd_task_queue_t q) static void destroy_queue(antd_queue_t q, int is_task)
{ {
antd_task_item_t it, curr; antd_queue_item_t it, curr;
it = q; it = q;
while (it) while (it)
{ {
// first free the task if(is_task)
if (it->task && it->task->callback) {
free_callback(it->task->callback); // first free the task
if (it->task) if (it->task && it->task->callback)
free(it->task); {
free_callback(it->task->callback);
it->task->callback = NULL;
}
if (it->task)
free(it->task);
}
else
{
if(it->raw_ptr)
{
free(it->raw_ptr);
}
}
// then free the placeholder // then free the placeholder
curr = it; curr = it;
it = it->next; it = it->next;
@ -150,7 +222,7 @@ static void *work(antd_worker_t *worker)
antd_scheduler_t *scheduler = (antd_scheduler_t *)worker->manager; antd_scheduler_t *scheduler = (antd_scheduler_t *)worker->manager;
while (scheduler->status) while (scheduler->status)
{ {
antd_task_item_t it; antd_queue_item_t it;
pthread_mutex_lock(&scheduler->worker_lock); pthread_mutex_lock(&scheduler->worker_lock);
it = dequeue(&scheduler->workers_queue); it = dequeue(&scheduler->workers_queue);
pthread_mutex_unlock(&scheduler->worker_lock); pthread_mutex_unlock(&scheduler->worker_lock);
@ -165,18 +237,57 @@ static void *work(antd_worker_t *worker)
else else
{ {
//LOG("task executed by worker %d\n", worker->id); //LOG("task executed by worker %d\n", worker->id);
antd_execute_task(scheduler, it); antd_execute_task(scheduler, it->task);
free(it);
} }
} }
return NULL; return NULL;
} }
static void print_static_info(bst_node_t *node, void **args, int argc)
{
if (argc != 2)
{
return;
}
int ret;
char *buffer = args[0];
int *fdp = args[1];
antd_task_t *task = (antd_task_t *)node->data;
// send statistic on task data
snprintf(buffer, MAX_FIFO_NAME_SZ, "---- Task %d created at: %lu ----\n", task->id, task->stamp);
ret = write(*fdp, buffer, strlen(buffer));
// send statistic on task data
snprintf(buffer, MAX_FIFO_NAME_SZ, "Access time: %lu\nn", (unsigned long)task->access_time);
ret = write(*fdp, buffer, strlen(buffer));
snprintf(buffer, MAX_FIFO_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL));
ret = write(*fdp, buffer, strlen(buffer));
snprintf(buffer, MAX_FIFO_NAME_SZ, "Task type: %d\n", task->type);
ret = write(*fdp, buffer, strlen(buffer));
if (task->handle)
{
snprintf(buffer, MAX_FIFO_NAME_SZ, "Has handle: yes\n");
ret = write(*fdp, buffer, strlen(buffer));
}
if (task->callback)
{
snprintf(buffer, MAX_FIFO_NAME_SZ, "Has callback: yes\n");
ret = write(*fdp, buffer, strlen(buffer));
}
UNUSED(ret);
// now print all task data statistic
antd_scheduler_ext_statistic(*fdp, task->data);
}
static void *statistic(antd_scheduler_t *scheduler) static void *statistic(antd_scheduler_t *scheduler)
{ {
fd_set fd_out; fd_set fd_out;
int ret; int ret;
char buffer[MAX_FIFO_NAME_SZ]; char buffer[MAX_FIFO_NAME_SZ];
antd_task_item_t it; void *argc[2];
while (scheduler->status) while (scheduler->status)
{ {
if (scheduler->stat_fd == -1) if (scheduler->stat_fd == -1)
@ -192,6 +303,8 @@ static void *statistic(antd_scheduler_t *scheduler)
set_nonblock(scheduler->stat_fd); set_nonblock(scheduler->stat_fd);
} }
} }
argc[0] = buffer;
argc[1] = &scheduler->stat_fd;
FD_ZERO(&fd_out); FD_ZERO(&fd_out);
FD_SET(scheduler->stat_fd, &fd_out); FD_SET(scheduler->stat_fd, &fd_out);
ret = select(scheduler->stat_fd + 1, NULL, &fd_out, NULL, NULL); ret = select(scheduler->stat_fd + 1, NULL, &fd_out, NULL, NULL);
@ -215,48 +328,8 @@ static void *statistic(antd_scheduler_t *scheduler)
snprintf(buffer, MAX_FIFO_NAME_SZ, "Pending task: %d. Detail:\n", scheduler->pending_task); snprintf(buffer, MAX_FIFO_NAME_SZ, "Pending task: %d. Detail:\n", scheduler->pending_task);
ret = write(scheduler->stat_fd, buffer, strlen(buffer)); ret = write(scheduler->stat_fd, buffer, strlen(buffer));
for (int i = 0; i < N_PRIORITY; i++) bst_for_each(scheduler->task_queue, print_static_info, argc, 2);
{
snprintf(buffer, MAX_FIFO_NAME_SZ, "#### PRIORITY: %d\n", i);
ret = write(scheduler->stat_fd, buffer, strlen(buffer));
it = scheduler->task_queue[i];
while (it)
{
// send statistic on task data
snprintf(buffer, MAX_FIFO_NAME_SZ, "---- Task created at: %lu ----\n", it->task->stamp);
ret = write(scheduler->stat_fd, buffer, strlen(buffer));
// send statistic on task data
snprintf(buffer, MAX_FIFO_NAME_SZ, "Access time: %lu\nn", (unsigned long)it->task->access_time);
ret = write(scheduler->stat_fd, buffer, strlen(buffer));
snprintf(buffer, MAX_FIFO_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL));
ret = write(scheduler->stat_fd, buffer, strlen(buffer));
snprintf(buffer, MAX_FIFO_NAME_SZ, "Task type: %d\n", it->task->type);
ret = write(scheduler->stat_fd, buffer, strlen(buffer));
if (it->task->handle)
{
snprintf(buffer, MAX_FIFO_NAME_SZ, "Has handle: yes\n");
ret = write(scheduler->stat_fd, buffer, strlen(buffer));
}
if (it->task->callback)
{
snprintf(buffer, MAX_FIFO_NAME_SZ, "Has callback: yes\n");
ret = write(scheduler->stat_fd, buffer, strlen(buffer));
}
// now print all task data statistic
if (scheduler->stat_data_cb)
{
scheduler->stat_data_cb(scheduler->stat_fd, it->task->data);
}
it = it->next;
}
}
pthread_mutex_unlock(&scheduler->scheduler_lock); pthread_mutex_unlock(&scheduler->scheduler_lock);
ret = close(scheduler->stat_fd); ret = close(scheduler->stat_fd);
scheduler->stat_fd = -1; scheduler->stat_fd = -1;
@ -265,7 +338,7 @@ static void *statistic(antd_scheduler_t *scheduler)
else else
{ {
ret = write(scheduler->stat_fd, ".", 1); ret = write(scheduler->stat_fd, ".", 1);
if(ret == -1) if (ret == -1)
{ {
ret = close(scheduler->stat_fd); ret = close(scheduler->stat_fd);
scheduler->stat_fd = -1; scheduler->stat_fd = -1;
@ -279,8 +352,8 @@ static void *statistic(antd_scheduler_t *scheduler)
} }
else else
{ {
ret = close(scheduler->stat_fd); ret = close(scheduler->stat_fd);
scheduler->stat_fd = -1; scheduler->stat_fd = -1;
} }
break; break;
} }
@ -302,36 +375,40 @@ static void *statistic(antd_scheduler_t *scheduler)
init the main scheduler init the main scheduler
*/ */
int antd_scheduler_init(antd_scheduler_t *scheduler, int n) antd_scheduler_t *antd_scheduler_init(int n, const char *stat_name)
{ {
antd_scheduler_t *scheduler = (antd_scheduler_t *)malloc(sizeof(antd_scheduler_t));
scheduler->n_workers = n; scheduler->n_workers = n;
scheduler->status = 1; scheduler->status = 1;
scheduler->workers_queue = NULL; scheduler->workers_queue = NULL;
scheduler->pending_task = 0; scheduler->pending_task = 0;
// scheduler->validate_data = 0;
// scheduler->destroy_data = NULL;
scheduler->stat_fd = -1; scheduler->stat_fd = -1;
//scheduler->stat_data_cb = NULL; scheduler->id_allocator = 0;
//memset(scheduler->stat_fifo, 0, MAX_FIFO_NAME_SZ); (void)memset(scheduler->stat_fifo, 0, MAX_FIFO_NAME_SZ);
if (stat_name)
{
(void)strncpy(scheduler->stat_fifo, stat_name, MAX_FIFO_NAME_SZ - 1);
}
// init semaphore // init semaphore
scheduler->scheduler_sem = sem_open("scheduler", O_CREAT, 0600, 0); scheduler->scheduler_sem = sem_open("scheduler", O_CREAT, 0600, 0);
if (scheduler->scheduler_sem == SEM_FAILED) if (scheduler->scheduler_sem == SEM_FAILED)
{ {
ERROR("Cannot open semaphore for scheduler"); ERROR("Cannot open semaphore for scheduler");
return -1; free(scheduler);
return NULL;
} }
scheduler->worker_sem = sem_open("worker", O_CREAT, 0600, 0); scheduler->worker_sem = sem_open("worker", O_CREAT, 0600, 0);
if (!scheduler->worker_sem) if (!scheduler->worker_sem)
{ {
ERROR("Cannot open semaphore for workers"); ERROR("Cannot open semaphore for workers");
return -1; free(scheduler);
return NULL;
} }
// init lock // init lock
pthread_mutex_init(&scheduler->scheduler_lock, NULL); pthread_mutex_init(&scheduler->scheduler_lock, NULL);
pthread_mutex_init(&scheduler->worker_lock, NULL); pthread_mutex_init(&scheduler->worker_lock, NULL);
pthread_mutex_init(&scheduler->pending_lock, NULL); pthread_mutex_init(&scheduler->pending_lock, NULL);
for (int i = 0; i < N_PRIORITY; i++) scheduler->task_queue = NULL;
scheduler->task_queue[i] = NULL;
// create scheduler.workers // create scheduler.workers
if (n > 0) if (n > 0)
{ {
@ -339,7 +416,8 @@ int antd_scheduler_init(antd_scheduler_t *scheduler, int n)
if (!scheduler->workers) if (!scheduler->workers)
{ {
ERROR("Cannot allocate memory for worker"); ERROR("Cannot allocate memory for worker");
return -1; free(scheduler);
return NULL;
} }
for (int i = 0; i < scheduler->n_workers; i++) for (int i = 0; i < scheduler->n_workers; i++)
{ {
@ -348,7 +426,8 @@ int antd_scheduler_init(antd_scheduler_t *scheduler, int n)
if (pthread_create(&scheduler->workers[i].tid, NULL, (void *(*)(void *))work, (void *)&scheduler->workers[i]) != 0) if (pthread_create(&scheduler->workers[i].tid, NULL, (void *(*)(void *))work, (void *)&scheduler->workers[i]) != 0)
{ {
ERROR("pthread_create: cannot create worker: %s", strerror(errno)); ERROR("pthread_create: cannot create worker: %s", strerror(errno));
return -1; free(scheduler);
return NULL;
} }
else else
{ {
@ -375,37 +454,47 @@ int antd_scheduler_init(antd_scheduler_t *scheduler, int n)
} }
} }
LOG("Antd scheduler initialized with %d worker", scheduler->n_workers); LOG("Antd scheduler initialized with %d worker", scheduler->n_workers);
return 0; return scheduler;
} }
static void destroy_task(void *data)
{
antd_task_t *task = (antd_task_t *)data;
if (task && task->callback)
free_callback(task->callback);
if (task)
free(task);
}
/* /*
destroy all pending task destroy all pending task
pthread_mutex_lock(&scheduler.queue_lock); pthread_mutex_lock(&scheduler.queue_lock);
*/ */
void antd_scheduler_destroy(antd_scheduler_t *scheduler) void antd_scheduler_destroy(antd_scheduler_t *scheduler)
{ {
if (!scheduler)
return;
// free all the chains // free all the chains
stop(scheduler); stop(scheduler);
LOG("Destroy remaining queue"); LOG("Destroy remaining queue");
for (int i = 0; i < N_PRIORITY; i++) bst_free_cb(scheduler->task_queue, destroy_task);
{ scheduler->task_queue = NULL;
destroy_queue(scheduler->task_queue[i]); destroy_queue(scheduler->workers_queue,1);
} free(scheduler);
destroy_queue(scheduler->workers_queue);
} }
/* /*
create a task create a task
*/ */
antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t atime) antd_task_t *antd_mktask(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t atime, antd_task_type_t type)
{ {
antd_task_t *task = (antd_task_t *)malloc(sizeof *task); antd_task_t *task = (antd_task_t *)malloc(sizeof *task);
task->stamp = (unsigned long)time(NULL); task->stamp = (unsigned long)time(NULL);
task->data = data; task->data = data;
task->handle = handle; task->handle = handle;
task->id = antd_task_data_id(data);
task->callback = callback_of(callback); task->callback = callback_of(callback);
task->priority = HIGH_PRIORITY; task->type = type;
task->type = HEAVY;
//task->type = LIGHT;
task->access_time = atime; task->access_time = atime;
return task; return task;
} }
@ -413,13 +502,12 @@ antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callb
/* /*
scheduling a task scheduling a task
*/ */
void antd_add_task(antd_scheduler_t *scheduler, antd_task_t *task) void antd_scheduler_add_task(antd_scheduler_t *scheduler, antd_task_t *task)
{ {
// check if task is exist if(task->id == 0)
int prio = task->priority > N_PRIORITY - 1 ? N_PRIORITY - 1 : task->priority; task->id = antd_scheduler_next_id(scheduler, task->id);
//LOG("Prio is %d\n", prio);
pthread_mutex_lock(&scheduler->scheduler_lock); pthread_mutex_lock(&scheduler->scheduler_lock);
enqueue(&scheduler->task_queue[prio], task); scheduler->task_queue = bst_insert(scheduler->task_queue, task->id, (void *)task);
pthread_mutex_unlock(&scheduler->scheduler_lock); pthread_mutex_unlock(&scheduler->scheduler_lock);
pthread_mutex_lock(&scheduler->pending_lock); pthread_mutex_lock(&scheduler->pending_lock);
scheduler->pending_task++; scheduler->pending_task++;
@ -428,50 +516,42 @@ void antd_add_task(antd_scheduler_t *scheduler, antd_task_t *task)
sem_post(scheduler->scheduler_sem); sem_post(scheduler->scheduler_sem);
} }
void antd_execute_task(antd_scheduler_t *scheduler, antd_task_item_t taski) static void antd_execute_task(antd_scheduler_t *scheduler, antd_task_t *task)
{ {
if (!taski) if (!task)
return; return;
// execute the task // execute the task
void *ret = (*(taski->task->handle))(taski->task->data); void *ret = (*(task->handle))(task->data);
// check the return data if it is a new task // check the return data if it is a new task
if (!ret) if (!ret)
{ {
// call the first callback // call the first callback
execute_callback(scheduler, taski->task); execute_callback(scheduler, task);
free(taski);
} }
else else
{ {
antd_task_t *rtask = (antd_task_t *)ret; antd_task_t *rtask = (antd_task_t *)ret;
if (taski->task->callback) if (task->callback)
{ {
if (rtask->callback) if (rtask->callback)
{ {
enqueue_callback(rtask->callback, taski->task->callback); enqueue_callback(rtask->callback, task->callback);
} }
else else
{ {
rtask->callback = taski->task->callback; rtask->callback = task->callback;
} }
} }
if (!rtask->handle) if (!rtask->handle)
{ {
// call the first callback // call the first callback
execute_callback(scheduler, rtask); execute_callback(scheduler, rtask);
free(taski->task); free(task);
free(taski);
} }
else else
{ {
rtask->priority = taski->task->priority + 1; antd_scheduler_add_task(scheduler, rtask);
if (rtask->priority > N_PRIORITY - 1) free(task);
{
rtask->priority = N_PRIORITY - 1;
}
antd_add_task(scheduler, rtask);
free(taski->task);
free(taski);
} }
} }
} }
@ -481,21 +561,31 @@ int antd_scheduler_busy(antd_scheduler_t *scheduler)
return scheduler->pending_task != 0; return scheduler->pending_task != 0;
} }
int antd_task_schedule(antd_scheduler_t *scheduler) void antd_scheduler_lock(antd_scheduler_t *sched)
{
pthread_mutex_lock(&sched->scheduler_lock);
}
void antd_scheduler_unlock(antd_scheduler_t *sched)
{
pthread_mutex_unlock(&sched->scheduler_lock);
}
static int antd_task_schedule(antd_scheduler_t *scheduler)
{ {
// fetch next task from the task_queue // fetch next task from the task_queue
antd_task_item_t it = NULL; antd_task_t *task = NULL;
bst_node_t *node;
pthread_mutex_lock(&scheduler->scheduler_lock); pthread_mutex_lock(&scheduler->scheduler_lock);
for (int i = 0; i < N_PRIORITY; i++) node = bst_find_min(scheduler->task_queue);
if (node)
{ {
task = (antd_task_t *)node->data;
it = dequeue(&scheduler->task_queue[i]); scheduler->task_queue = bst_delete(scheduler->task_queue, node->key);
if (it)
break;
} }
pthread_mutex_unlock(&scheduler->scheduler_lock); pthread_mutex_unlock(&scheduler->scheduler_lock);
// no task // no task
if (!it) if (!task)
{ {
return 0; return 0;
} }
@ -504,41 +594,44 @@ int antd_task_schedule(antd_scheduler_t *scheduler)
pthread_mutex_unlock(&scheduler->pending_lock); pthread_mutex_unlock(&scheduler->pending_lock);
// has the task now // has the task now
// validate the task // validate the task
if (scheduler->validate_data && difftime(time(NULL), it->task->access_time) > MAX_VALIDITY_INTERVAL && it->task->priority == N_PRIORITY - 1) //if (scheduler->validate_data && difftime(time(NULL), it->task->access_time) > MAX_VALIDITY_INTERVAL && it->task->priority == N_PRIORITY - 1)
if (antd_scheduler_validate_data(task) == 0)
{ {
// data task is not valid // data task is not valid
LOG("Task is no longer valid and will be killed"); LOG("Task is no longer valid and will be killed");
if (scheduler->destroy_data) antd_scheduler_destroy_data(task->data);
scheduler->destroy_data(it->task->data); if (task->callback)
if (it->task->callback) {
free_callback(it->task->callback); free_callback(task->callback);
free(it->task); task->callback = NULL;
free(it); }
free(task);
return 0; return 0;
} }
// check the type of task // check the type of task
if (it->task->type == LIGHT || scheduler->n_workers <= 0) if (task->type == LIGHT || scheduler->n_workers <= 0)
{ {
// do it by myself // do it by myself
antd_execute_task(scheduler, it); antd_execute_task(scheduler, task);
} }
else else
{ {
// delegate to other workers by // delegate to other workers by
//pushing to the worker queue //pushing to the worker queue
pthread_mutex_lock(&scheduler->worker_lock); pthread_mutex_lock(&scheduler->worker_lock);
enqueue(&scheduler->workers_queue, it->task); enqueue(&scheduler->workers_queue, task);
pthread_mutex_unlock(&scheduler->worker_lock); pthread_mutex_unlock(&scheduler->worker_lock);
// wake up idle worker // wake up idle worker
sem_post(scheduler->worker_sem); sem_post(scheduler->worker_sem);
free(it);
} }
return 1; return 1;
} }
void antd_wait(antd_scheduler_t *scheduler) void* antd_scheduler_wait(void* ptr)
{ {
int stat; int stat;
antd_scheduler_t *scheduler = (antd_scheduler_t *) ptr;
while (scheduler->status) while (scheduler->status)
{ {
stat = antd_task_schedule(scheduler); stat = antd_task_schedule(scheduler);
@ -548,4 +641,47 @@ void antd_wait(antd_scheduler_t *scheduler)
sem_wait(scheduler->scheduler_sem); sem_wait(scheduler->scheduler_sem);
} }
} }
return NULL;
} }
int antd_scheduler_ok(antd_scheduler_t *scheduler)
{
return scheduler->status;
}
int antd_scheduler_next_id(antd_scheduler_t *sched, int input)
{
int id = input;
pthread_mutex_lock(&sched->scheduler_lock);
if (id == 0)
{
sched->id_allocator++;
id = sched->id_allocator;
}
while (bst_find(sched->task_queue, id) != NULL)
{
id = sched->id_allocator;
}
pthread_mutex_unlock(&sched->scheduler_lock);
return id;
}
void antd_scheduler_ext_statistic(int fd, void *data)
{
UNUSED(fd);
UNUSED(data);
}
int antd_scheduler_validate_data(antd_task_t *task)
{
return !(difftime(time(NULL), task->access_time) > MAX_VALIDITY_INTERVAL);
}
void antd_scheduler_destroy_data(void *data)
{
UNUSED(data);
}
int antd_task_data_id(void *data)
{
UNUSED(data);
intptr_t ptr = (intptr_t)data;
return (int)ptr;
}

View File

@ -4,143 +4,113 @@
#include <pthread.h> #include <pthread.h>
#include <semaphore.h> #include <semaphore.h>
#include <stdint.h> #include <stdint.h>
#define N_PRIORITY 10
#define NORMAL_PRIORITY ((int)((N_PRIORITY - 1) / 2)) #define antd_create_task(h, d, c, t) (antd_mktask(h, d, c, t, HEAVY))
#define LOW_PRIORITY (N_PRIORITY - 1)
#define HIGH_PRIORITY 0
#define MAX_VALIDITY_INTERVAL 20 // 10 s for task validity
#define MAX_FIFO_NAME_SZ 255
typedef enum typedef enum
{ {
LIGHT, LIGHT,
HEAVY HEAVY
} antd_task_type_t; } antd_task_type_t;
// callback definition
typedef struct __callback_t typedef struct _antd_scheduler_t antd_scheduler_t;
{ typedef struct _antd_callback_t antd_callback_t;
void *(*handle)(void *); typedef struct _antd_queue_item_t antd_scheduler_evt_t;
struct __callback_t *next;
} antd_callback_t;
// task definition
typedef struct typedef struct
{ {
/* /**
creation time of a task * task id
*/
int id;
/**
* creation time of a task
*/ */
unsigned long stamp; unsigned long stamp;
/* /**
Last access time of * Last access time of
task data * task data
*/ */
time_t access_time; time_t access_time;
/* /**
priority from 0 to N_PRIORITY - 1 * the handle and callback
higher value is lower priority
*/
uint8_t priority;
/*
the callback
*/ */
void *(*handle)(void *); void *(*handle)(void *);
antd_callback_t *callback; antd_callback_t *callback;
/* /**
user data if any * The task events
*/ * each task must be binded to
* one or more event, otherwise it will be
* rejected by the scheduler
* */
antd_scheduler_evt_t* events;
/**
* user data if any
*/
void *data; void *data;
/* /**
type of a task * type of a task
light tasks are executed directly * light tasks are executed directly
heavy tasks are delegated to workers * heavy tasks are delegated to workers
*/ */
antd_task_type_t type; antd_task_type_t type;
} antd_task_t; } antd_task_t;
typedef struct __task_item_t
{
antd_task_t *task;
struct __task_item_t *next;
} * antd_task_item_t;
typedef antd_task_item_t antd_task_queue_t;
typedef struct
{
int id;
pthread_t tid;
void *manager;
} antd_worker_t;
typedef struct
{
// data lock
pthread_mutex_t scheduler_lock;
pthread_mutex_t worker_lock;
pthread_mutex_t pending_lock;
// event handle
sem_t *scheduler_sem;
sem_t *worker_sem;
// worker and data
antd_task_queue_t task_queue[N_PRIORITY];
antd_task_queue_t workers_queue;
uint8_t status; // 0 stop, 1 working
antd_worker_t *workers;
int n_workers;
int pending_task;
/*
function pointer that free data in a task if
the task is not valid
default to NULL
*/
void* (*destroy_data)(void*);
int validate_data;
/**
* statistic infomation
*/
char stat_fifo[MAX_FIFO_NAME_SZ];
int stat_fd;
pthread_t stat_tid;
void (*stat_data_cb)(int, void *);
} antd_scheduler_t;
/* /*
init the main scheduler * nit the main scheduler
*/ */
int antd_scheduler_init(antd_scheduler_t *, int); antd_scheduler_t *antd_scheduler_init(int, const char *stat_name);
/* /*
destroy all pending task * destroy all pending task
*/ */
void antd_scheduler_destroy(antd_scheduler_t *); void antd_scheduler_destroy(antd_scheduler_t *);
/* /**
create a task * create a task
parameter: * parameter:
- handle * - handle
- data * - data
- callback * - callback
- last data access time * - last data access time
*/ */
antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t); antd_task_t *antd_mktask(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t, antd_task_type_t type);
/* /**
add a task * add a task
*/ */
void antd_add_task(antd_scheduler_t *, antd_task_t *); void antd_scheduler_add_task(antd_scheduler_t *, antd_task_t *);
/*
execute and free a task a task /**
*/ * check if scheduler is busy
void antd_execute_task(antd_scheduler_t *, antd_task_item_t);
/*
scheduler status
*/ */
int antd_scheduler_busy(antd_scheduler_t *); int antd_scheduler_busy(antd_scheduler_t *);
/* /**
schedule a task * get scheduler status
* */
int antd_scheduler_ok(antd_scheduler_t *scheduler);
/**
*
* wait for event
*/ */
int antd_task_schedule(antd_scheduler_t *); void* antd_scheduler_wait(void *);
/*
wait for event
*/
void antd_wait(antd_scheduler_t *);
antd_callback_t* callback_of( void* (*callback)(void*) ); /**
* lock the scheduler
* */
void antd_scheduler_lock(antd_scheduler_t *);
/**
* Get next valid task id
* */
int antd_scheduler_next_id(antd_scheduler_t* sched, int input);
/**
* unlock the scheduler
* */
void antd_scheduler_unlock(antd_scheduler_t *);
/**
* weak functions that should be overridden by the application
* that user the scheduler as library
*/
void __attribute__((weak)) antd_scheduler_ext_statistic(int fd, void *data);
int __attribute__((weak)) antd_scheduler_validate_data(antd_task_t *task);
void __attribute__((weak)) antd_scheduler_destroy_data(void *data);
int __attribute__((weak)) antd_task_data_id(void *data);
#endif #endif

View File

@ -545,4 +545,53 @@ int mkdirp(const char* path, mode_t mode)
} }
} }
return mkdir(path, mode); return mkdir(path, mode);
}
int guard_read(int fd, void* buffer, size_t size)
{
int n = 0;
int read_len;
int st;
while(n != (int)size)
{
read_len = (int)size - n;
st = read(fd,buffer + n,read_len);
if(st == -1)
{
ERROR( "Unable to read from #%d: %s", fd, strerror(errno));
return -1;
}
if(st == 0)
{
ERROR("Endpoint %d is closed", fd);
return -1;
}
n += st;
}
return n;
}
int guard_write(int fd, void* buffer, size_t size)
{
int n = 0;
int write_len;
int st;
while(n != (int)size)
{
write_len = (int)size - n;
st = write(fd,buffer + n,write_len);
if(st == -1)
{
ERROR("Unable to write to #%d: %s", fd, strerror(errno));
return -1;
}
if(st == 0)
{
ERROR("Endpoint %d is closed", fd);
return -1;
}
n += st;
}
return n;
} }

View File

@ -94,4 +94,6 @@ void md5(uint8_t *, size_t , char*);
void sha1(const char*, char*); void sha1(const char*, char*);
void digest_to_hex(const uint8_t *, char *); void digest_to_hex(const uint8_t *, char *);
void verify_header(char* k); void verify_header(char* k);
int guard_read(int fd, void* buffer, size_t size);
int guard_write(int fd, void* buffer, size_t size);
#endif #endif