mirror of
https://github.com/lxsang/ant-http
synced 2024-12-26 16:58:22 +01:00
use the new scheduler
This commit is contained in:
parent
cbbc48d216
commit
08877f84ad
@ -300,6 +300,7 @@ void *accept_request(void *data)
|
|||||||
antd_request_t *rq = (antd_request_t *)data;
|
antd_request_t *rq = (antd_request_t *)data;
|
||||||
|
|
||||||
task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
||||||
|
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
fd_set read_flags, write_flags;
|
fd_set read_flags, write_flags;
|
||||||
// first verify if the socket is ready
|
// first verify if the socket is ready
|
||||||
antd_client_t *client = (antd_client_t *)rq->client;
|
antd_client_t *client = (antd_client_t *)rq->client;
|
||||||
@ -406,6 +407,7 @@ void *resolve_request(void *data)
|
|||||||
char path[2 * BUFFLEN];
|
char path[2 * BUFFLEN];
|
||||||
antd_request_t *rq = (antd_request_t *)data;
|
antd_request_t *rq = (antd_request_t *)data;
|
||||||
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
||||||
|
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
char *url = (char *)dvalue(rq->request, "RESOURCE_PATH");
|
char *url = (char *)dvalue(rq->request, "RESOURCE_PATH");
|
||||||
char *newurl = NULL;
|
char *newurl = NULL;
|
||||||
char *rqp = NULL;
|
char *rqp = NULL;
|
||||||
@ -498,8 +500,6 @@ void *resolve_request(void *data)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
task->type = HEAVY;
|
|
||||||
|
|
||||||
// discard all request data
|
// discard all request data
|
||||||
dictionary_t headers = (dictionary_t)dvalue(rq->request, "REQUEST_HEADER");
|
dictionary_t headers = (dictionary_t)dvalue(rq->request, "REQUEST_HEADER");
|
||||||
if (headers)
|
if (headers)
|
||||||
@ -613,6 +613,7 @@ void *serve_file(void *data)
|
|||||||
{
|
{
|
||||||
antd_request_t *rq = (antd_request_t *)data;
|
antd_request_t *rq = (antd_request_t *)data;
|
||||||
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
||||||
|
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
char *path = (char *)dvalue(rq->request, "ABS_RESOURCE_PATH");
|
char *path = (char *)dvalue(rq->request, "ABS_RESOURCE_PATH");
|
||||||
char *mime_type = (char *)dvalue(rq->request, "RESOURCE_MIME");
|
char *mime_type = (char *)dvalue(rq->request, "RESOURCE_MIME");
|
||||||
rq->client->state = ANTD_CLIENT_SERVE_FILE;
|
rq->client->state = ANTD_CLIENT_SERVE_FILE;
|
||||||
@ -772,6 +773,7 @@ void *decode_request_header(void *data)
|
|||||||
dictionary_t request = dvalue(rq->request, "REQUEST_DATA");
|
dictionary_t request = dvalue(rq->request, "REQUEST_DATA");
|
||||||
char *port_s = (char *)dvalue(xheader, "SERVER_PORT");
|
char *port_s = (char *)dvalue(xheader, "SERVER_PORT");
|
||||||
port_config_t *pcnf = (port_config_t *)dvalue(server_config.ports, port_s);
|
port_config_t *pcnf = (port_config_t *)dvalue(server_config.ports, port_s);
|
||||||
|
antd_task_t * task;
|
||||||
// first real all header
|
// first real all header
|
||||||
// this for check if web socket is enabled
|
// this for check if web socket is enabled
|
||||||
|
|
||||||
@ -805,7 +807,9 @@ void *decode_request_header(void *data)
|
|||||||
{
|
{
|
||||||
antd_error(rq->client, 413, "Payload Too Large");
|
antd_error(rq->client, 413, "Payload Too Large");
|
||||||
ERROR("Header size too large (%d): %d vs %d", rq->client->sock, header_size, HEADER_MAX_SIZE);
|
ERROR("Header size too large (%d): %d vs %d", rq->client->sock, header_size, HEADER_MAX_SIZE);
|
||||||
return antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
||||||
|
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
|
return task;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// check for content length size
|
// check for content length size
|
||||||
@ -819,7 +823,9 @@ void *decode_request_header(void *data)
|
|||||||
// dirty fix, wait for message to be sent
|
// dirty fix, wait for message to be sent
|
||||||
// 100 ms sleep
|
// 100 ms sleep
|
||||||
usleep(100000);
|
usleep(100000);
|
||||||
return antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
||||||
|
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
|
return task;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -863,7 +869,8 @@ void *decode_request_header(void *data)
|
|||||||
if (host)
|
if (host)
|
||||||
free(host);
|
free(host);
|
||||||
// header ok, now checkmethod
|
// header ok, now checkmethod
|
||||||
antd_task_t *task = antd_create_task(decode_request, (void *)rq, NULL, rq->client->last_io);
|
task = antd_create_task(decode_request, (void *)rq, NULL, rq->client->last_io);
|
||||||
|
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -882,6 +889,7 @@ void *decode_request(void *data)
|
|||||||
ws = 1;
|
ws = 1;
|
||||||
method = (char *)dvalue(rq->request, "METHOD");
|
method = (char *)dvalue(rq->request, "METHOD");
|
||||||
task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
||||||
|
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
if (strcmp(method, "GET") == 0 || strcmp(method, "HEAD") == 0 || strcmp(method, "OPTIONS") == 0)
|
if (strcmp(method, "GET") == 0 || strcmp(method, "HEAD") == 0 || strcmp(method, "OPTIONS") == 0)
|
||||||
{
|
{
|
||||||
//if(ctype) free(ctype);
|
//if(ctype) free(ctype);
|
||||||
@ -900,7 +908,6 @@ void *decode_request(void *data)
|
|||||||
else if (strcmp(method, "POST") == 0)
|
else if (strcmp(method, "POST") == 0)
|
||||||
{
|
{
|
||||||
task->handle = resolve_request;
|
task->handle = resolve_request;
|
||||||
//task->type = HEAVY;
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -925,7 +932,7 @@ void *decode_post_request(void *data)
|
|||||||
clen = atoi(tmp);
|
clen = atoi(tmp);
|
||||||
char *method = (char *)dvalue(rq->request, "METHOD");
|
char *method = (char *)dvalue(rq->request, "METHOD");
|
||||||
task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
||||||
task->type = HEAVY;
|
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
if (!method || strcmp(method, "POST") != 0)
|
if (!method || strcmp(method, "POST") != 0)
|
||||||
return task;
|
return task;
|
||||||
if (ctype == NULL || clen == -1)
|
if (ctype == NULL || clen == -1)
|
||||||
@ -1041,6 +1048,7 @@ void *decode_multi_part_request(void *data, const char *ctype)
|
|||||||
int len;
|
int len;
|
||||||
antd_request_t *rq = (antd_request_t *)data;
|
antd_request_t *rq = (antd_request_t *)data;
|
||||||
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
||||||
|
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
//dictionary dic = NULL;
|
//dictionary dic = NULL;
|
||||||
boundary = strsep(&str_copy, "="); //discard first part
|
boundary = strsep(&str_copy, "="); //discard first part
|
||||||
boundary = str_copy;
|
boundary = str_copy;
|
||||||
@ -1057,7 +1065,6 @@ void *decode_multi_part_request(void *data, const char *ctype)
|
|||||||
task->handle = decode_multi_part_request_data;
|
task->handle = decode_multi_part_request_data;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
task->type = HEAVY;
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
void *decode_multi_part_request_data(void *data)
|
void *decode_multi_part_request_data(void *data)
|
||||||
@ -1075,6 +1082,7 @@ void *decode_multi_part_request_data(void *data)
|
|||||||
char *token, *keytoken, *valtoken;
|
char *token, *keytoken, *valtoken;
|
||||||
antd_request_t *rq = (antd_request_t *)data;
|
antd_request_t *rq = (antd_request_t *)data;
|
||||||
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
||||||
|
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
char *boundary = (char *)dvalue(rq->request, "MULTI_PART_BOUNDARY");
|
char *boundary = (char *)dvalue(rq->request, "MULTI_PART_BOUNDARY");
|
||||||
dictionary_t dic = (dictionary_t)dvalue(rq->request, "REQUEST_DATA");
|
dictionary_t dic = (dictionary_t)dvalue(rq->request, "REQUEST_DATA");
|
||||||
// search for content disposition:
|
// search for content disposition:
|
||||||
@ -1202,7 +1210,6 @@ void *decode_multi_part_request_data(void *data)
|
|||||||
if (line && strstr(line, boundary))
|
if (line && strstr(line, boundary))
|
||||||
{
|
{
|
||||||
// continue upload
|
// continue upload
|
||||||
task->type = HEAVY;
|
|
||||||
task->handle = decode_multi_part_request_data;
|
task->handle = decode_multi_part_request_data;
|
||||||
}
|
}
|
||||||
free(boundend);
|
free(boundend);
|
||||||
@ -1296,6 +1303,7 @@ void *execute_plugin(void *data, const char *pname)
|
|||||||
char *error;
|
char *error;
|
||||||
antd_request_t *rq = (antd_request_t *)data;
|
antd_request_t *rq = (antd_request_t *)data;
|
||||||
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io);
|
||||||
|
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
//LOG("Plugin name '%s'", pname);
|
//LOG("Plugin name '%s'", pname);
|
||||||
rq->client->state = ANTD_CLIENT_PLUGIN_EXEC;
|
rq->client->state = ANTD_CLIENT_PLUGIN_EXEC;
|
||||||
//load the plugin
|
//load the plugin
|
||||||
@ -1328,12 +1336,12 @@ void *execute_plugin(void *data, const char *pname)
|
|||||||
if (meta && meta->raw_body == 1)
|
if (meta && meta->raw_body == 1)
|
||||||
{
|
{
|
||||||
task->handle = fn;
|
task->handle = fn;
|
||||||
task->type = HEAVY;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
free(task);
|
free(task);
|
||||||
task = antd_create_task(decode_post_request, (void *)rq, fn, rq->client->last_io);
|
task = antd_create_task(decode_post_request, (void *)rq, fn, rq->client->last_io);
|
||||||
|
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
}
|
}
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
2
httpd.c
2
httpd.c
@ -241,7 +241,7 @@ static void antd_monitor(port_config_t *pcnf)
|
|||||||
antd_scheduler_unlock(scheduler);
|
antd_scheduler_unlock(scheduler);
|
||||||
// create callback for the server
|
// create callback for the server
|
||||||
task = antd_create_task(accept_request, (void *)request, finish_request, client->last_io);
|
task = antd_create_task(accept_request, (void *)request, finish_request, client->last_io);
|
||||||
//task->type = LIGHT;
|
antd_task_bind_event(task,client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
|
||||||
antd_scheduler_add_task(scheduler, task);
|
antd_scheduler_add_task(scheduler, task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
261
lib/scheduler.c
261
lib/scheduler.c
@ -5,12 +5,14 @@
|
|||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
#include <poll.h>
|
||||||
#include "scheduler.h"
|
#include "scheduler.h"
|
||||||
#include "utils.h"
|
#include "utils.h"
|
||||||
#include "bst.h"
|
#include "bst.h"
|
||||||
|
|
||||||
#define MAX_VALIDITY_INTERVAL 20
|
#define MAX_VALIDITY_INTERVAL 20
|
||||||
#define MAX_FIFO_NAME_SZ 255
|
#define MAX_FIFO_NAME_SZ 255
|
||||||
|
#define POLL_EVENT_TO 100 // ms
|
||||||
|
|
||||||
// callback definition
|
// callback definition
|
||||||
struct _antd_callback_t
|
struct _antd_callback_t
|
||||||
@ -19,23 +21,26 @@ struct _antd_callback_t
|
|||||||
struct _antd_callback_t *next;
|
struct _antd_callback_t *next;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct
|
||||||
int type;
|
{
|
||||||
|
int flags;
|
||||||
int fd;
|
int fd;
|
||||||
} antd_scheduler_evt_item_t;
|
int timeout; // seconds
|
||||||
|
antd_task_t *task;
|
||||||
|
} antd_task_evt_item_t;
|
||||||
|
|
||||||
struct _antd_queue_item_t
|
struct _antd_queue_item_t
|
||||||
{
|
{
|
||||||
union
|
union
|
||||||
{
|
{
|
||||||
antd_scheduler_evt_item_t* evt;
|
antd_task_evt_item_t *evt;
|
||||||
antd_task_t *task;
|
antd_task_t *task;
|
||||||
void * raw_ptr;
|
void *raw_ptr;
|
||||||
};
|
};
|
||||||
struct _antd_queue_item_t *next;
|
struct _antd_queue_item_t *next;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct _antd_queue_item_t* antd_queue_item_t;
|
typedef struct _antd_queue_item_t *antd_queue_item_t;
|
||||||
|
|
||||||
typedef antd_queue_item_t antd_queue_t;
|
typedef antd_queue_item_t antd_queue_t;
|
||||||
|
|
||||||
@ -70,7 +75,8 @@ struct _antd_scheduler_t
|
|||||||
|
|
||||||
static antd_callback_t *callback_of(void *(*callback)(void *));
|
static antd_callback_t *callback_of(void *(*callback)(void *));
|
||||||
static void antd_execute_task(antd_scheduler_t *, antd_task_t *);
|
static void antd_execute_task(antd_scheduler_t *, antd_task_t *);
|
||||||
static int antd_task_schedule(antd_scheduler_t *);
|
static void antd_task_schedule(antd_scheduler_t *scheduler, antd_task_t *task);
|
||||||
|
static void destroy_task(void *data);
|
||||||
|
|
||||||
static void set_nonblock(int fd)
|
static void set_nonblock(int fd)
|
||||||
{
|
{
|
||||||
@ -179,11 +185,12 @@ static void execute_callback(antd_scheduler_t *scheduler, antd_task_t *task)
|
|||||||
task->handle = cb->handle;
|
task->handle = cb->handle;
|
||||||
task->callback = task->callback->next;
|
task->callback = task->callback->next;
|
||||||
free(cb);
|
free(cb);
|
||||||
|
//antd_task_bind_event(task, 0, 0, TASK_EVT_ALWAY_ON);
|
||||||
antd_scheduler_add_task(scheduler, task);
|
antd_scheduler_add_task(scheduler, task);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
free(task);
|
destroy_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -193,20 +200,14 @@ static void destroy_queue(antd_queue_t q, int is_task)
|
|||||||
it = q;
|
it = q;
|
||||||
while (it)
|
while (it)
|
||||||
{
|
{
|
||||||
if(is_task)
|
if (is_task)
|
||||||
{
|
{
|
||||||
// first free the task
|
// first free the task
|
||||||
if (it->task && it->task->callback)
|
destroy_task(it->task);
|
||||||
{
|
|
||||||
free_callback(it->task->callback);
|
|
||||||
it->task->callback = NULL;
|
|
||||||
}
|
|
||||||
if (it->task)
|
|
||||||
free(it->task);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if(it->raw_ptr)
|
if (it->raw_ptr)
|
||||||
{
|
{
|
||||||
free(it->raw_ptr);
|
free(it->raw_ptr);
|
||||||
}
|
}
|
||||||
@ -264,9 +265,6 @@ static void print_static_info(bst_node_t *node, void **args, int argc)
|
|||||||
snprintf(buffer, MAX_FIFO_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL));
|
snprintf(buffer, MAX_FIFO_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL));
|
||||||
ret = write(*fdp, buffer, strlen(buffer));
|
ret = write(*fdp, buffer, strlen(buffer));
|
||||||
|
|
||||||
snprintf(buffer, MAX_FIFO_NAME_SZ, "Task type: %d\n", task->type);
|
|
||||||
ret = write(*fdp, buffer, strlen(buffer));
|
|
||||||
|
|
||||||
if (task->handle)
|
if (task->handle)
|
||||||
{
|
{
|
||||||
snprintf(buffer, MAX_FIFO_NAME_SZ, "Has handle: yes\n");
|
snprintf(buffer, MAX_FIFO_NAME_SZ, "Has handle: yes\n");
|
||||||
@ -460,8 +458,18 @@ antd_scheduler_t *antd_scheduler_init(int n, const char *stat_name)
|
|||||||
static void destroy_task(void *data)
|
static void destroy_task(void *data)
|
||||||
{
|
{
|
||||||
antd_task_t *task = (antd_task_t *)data;
|
antd_task_t *task = (antd_task_t *)data;
|
||||||
if (task && task->callback)
|
if (!task)
|
||||||
|
return;
|
||||||
|
if (task->callback)
|
||||||
|
{
|
||||||
free_callback(task->callback);
|
free_callback(task->callback);
|
||||||
|
task->callback = NULL;
|
||||||
|
}
|
||||||
|
if (task->events)
|
||||||
|
{
|
||||||
|
destroy_queue(task->events, 0);
|
||||||
|
task->events = NULL;
|
||||||
|
}
|
||||||
if (task)
|
if (task)
|
||||||
free(task);
|
free(task);
|
||||||
}
|
}
|
||||||
@ -479,14 +487,14 @@ void antd_scheduler_destroy(antd_scheduler_t *scheduler)
|
|||||||
LOG("Destroy remaining queue");
|
LOG("Destroy remaining queue");
|
||||||
bst_free_cb(scheduler->task_queue, destroy_task);
|
bst_free_cb(scheduler->task_queue, destroy_task);
|
||||||
scheduler->task_queue = NULL;
|
scheduler->task_queue = NULL;
|
||||||
destroy_queue(scheduler->workers_queue,1);
|
destroy_queue(scheduler->workers_queue, 1);
|
||||||
free(scheduler);
|
free(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
create a task
|
create a task
|
||||||
*/
|
*/
|
||||||
antd_task_t *antd_mktask(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t atime, antd_task_type_t type)
|
antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t atime)
|
||||||
{
|
{
|
||||||
antd_task_t *task = (antd_task_t *)malloc(sizeof *task);
|
antd_task_t *task = (antd_task_t *)malloc(sizeof *task);
|
||||||
task->stamp = (unsigned long)time(NULL);
|
task->stamp = (unsigned long)time(NULL);
|
||||||
@ -494,8 +502,8 @@ antd_task_t *antd_mktask(void *(*handle)(void *), void *data, void *(*callback)(
|
|||||||
task->handle = handle;
|
task->handle = handle;
|
||||||
task->id = antd_task_data_id(data);
|
task->id = antd_task_data_id(data);
|
||||||
task->callback = callback_of(callback);
|
task->callback = callback_of(callback);
|
||||||
task->type = type;
|
|
||||||
task->access_time = atime;
|
task->access_time = atime;
|
||||||
|
task->events = NULL;
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -504,7 +512,7 @@ antd_task_t *antd_mktask(void *(*handle)(void *), void *data, void *(*callback)(
|
|||||||
*/
|
*/
|
||||||
void antd_scheduler_add_task(antd_scheduler_t *scheduler, antd_task_t *task)
|
void antd_scheduler_add_task(antd_scheduler_t *scheduler, antd_task_t *task)
|
||||||
{
|
{
|
||||||
if(task->id == 0)
|
if (task->id == 0)
|
||||||
task->id = antd_scheduler_next_id(scheduler, task->id);
|
task->id = antd_scheduler_next_id(scheduler, task->id);
|
||||||
pthread_mutex_lock(&scheduler->scheduler_lock);
|
pthread_mutex_lock(&scheduler->scheduler_lock);
|
||||||
scheduler->task_queue = bst_insert(scheduler->task_queue, task->id, (void *)task);
|
scheduler->task_queue = bst_insert(scheduler->task_queue, task->id, (void *)task);
|
||||||
@ -541,17 +549,18 @@ static void antd_execute_task(antd_scheduler_t *scheduler, antd_task_t *task)
|
|||||||
{
|
{
|
||||||
rtask->callback = task->callback;
|
rtask->callback = task->callback;
|
||||||
}
|
}
|
||||||
|
task->callback = NULL;
|
||||||
}
|
}
|
||||||
if (!rtask->handle)
|
if (!rtask->handle)
|
||||||
{
|
{
|
||||||
// call the first callback
|
// call the first callback
|
||||||
execute_callback(scheduler, rtask);
|
execute_callback(scheduler, rtask);
|
||||||
free(task);
|
destroy_task(task);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
antd_scheduler_add_task(scheduler, rtask);
|
antd_scheduler_add_task(scheduler, rtask);
|
||||||
free(task);
|
destroy_task(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -571,23 +580,12 @@ void antd_scheduler_unlock(antd_scheduler_t *sched)
|
|||||||
pthread_mutex_unlock(&sched->scheduler_lock);
|
pthread_mutex_unlock(&sched->scheduler_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int antd_task_schedule(antd_scheduler_t *scheduler)
|
static void antd_task_schedule(antd_scheduler_t *scheduler, antd_task_t *task)
|
||||||
{
|
{
|
||||||
// fetch next task from the task_queue
|
|
||||||
antd_task_t *task = NULL;
|
|
||||||
bst_node_t *node;
|
|
||||||
pthread_mutex_lock(&scheduler->scheduler_lock);
|
|
||||||
node = bst_find_min(scheduler->task_queue);
|
|
||||||
if (node)
|
|
||||||
{
|
|
||||||
task = (antd_task_t *)node->data;
|
|
||||||
scheduler->task_queue = bst_delete(scheduler->task_queue, node->key);
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&scheduler->scheduler_lock);
|
|
||||||
// no task
|
// no task
|
||||||
if (!task)
|
if (!task)
|
||||||
{
|
{
|
||||||
return 0;
|
return;
|
||||||
}
|
}
|
||||||
pthread_mutex_lock(&scheduler->pending_lock);
|
pthread_mutex_lock(&scheduler->pending_lock);
|
||||||
scheduler->pending_task--;
|
scheduler->pending_task--;
|
||||||
@ -600,18 +598,11 @@ static int antd_task_schedule(antd_scheduler_t *scheduler)
|
|||||||
// data task is not valid
|
// data task is not valid
|
||||||
LOG("Task is no longer valid and will be killed");
|
LOG("Task is no longer valid and will be killed");
|
||||||
antd_scheduler_destroy_data(task->data);
|
antd_scheduler_destroy_data(task->data);
|
||||||
if (task->callback)
|
destroy_task(task);
|
||||||
{
|
return;
|
||||||
free_callback(task->callback);
|
|
||||||
task->callback = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
free(task);
|
if (scheduler->n_workers <= 0)
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// check the type of task
|
|
||||||
if (task->type == LIGHT || scheduler->n_workers <= 0)
|
|
||||||
{
|
{
|
||||||
// do it by myself
|
// do it by myself
|
||||||
antd_execute_task(scheduler, task);
|
antd_execute_task(scheduler, task);
|
||||||
@ -626,16 +617,170 @@ static int antd_task_schedule(antd_scheduler_t *scheduler)
|
|||||||
// wake up idle worker
|
// wake up idle worker
|
||||||
sem_post(scheduler->worker_sem);
|
sem_post(scheduler->worker_sem);
|
||||||
}
|
}
|
||||||
return 1;
|
|
||||||
}
|
}
|
||||||
void* antd_scheduler_wait(void* ptr)
|
|
||||||
|
static void task_polls_collect(bst_node_t* node, void** argv, int argc)
|
||||||
{
|
{
|
||||||
int stat;
|
UNUSED(argc);
|
||||||
antd_scheduler_t *scheduler = (antd_scheduler_t *) ptr;
|
antd_task_evt_item_t* it = (antd_task_evt_item_t*)node->data;
|
||||||
|
struct pollfd* pfds = (struct pollfd*)argv[0];
|
||||||
|
if(it)
|
||||||
|
{
|
||||||
|
pfds[node->key].fd = it->fd;
|
||||||
|
if(it->flags & TASK_EVT_ON_READABLE)
|
||||||
|
{
|
||||||
|
pfds[node->key].events |= POLLIN;
|
||||||
|
}
|
||||||
|
if(it->flags & TASK_EVT_ON_WRITABLE)
|
||||||
|
{
|
||||||
|
pfds[node->key].events |= POLLOUT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void task_event_collect(bst_node_t* node, void** argv, int argc)
|
||||||
|
{
|
||||||
|
UNUSED(argc);
|
||||||
|
antd_task_t* task = (antd_task_t*) node->data;
|
||||||
|
antd_queue_t* exec_list = (antd_queue_t*) argv[0];
|
||||||
|
bst_node_t** poll_list = (bst_node_t**) argv[1];
|
||||||
|
int* pollsize = (int*) argv[2];
|
||||||
|
|
||||||
|
if(!task->events)
|
||||||
|
{
|
||||||
|
enqueue(exec_list, task);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
antd_queue_item_t it = task->events;
|
||||||
|
while(it)
|
||||||
|
{
|
||||||
|
if(it->evt->flags & TASK_EVT_ALWAY_ON)
|
||||||
|
{
|
||||||
|
enqueue(exec_list, task);
|
||||||
|
}
|
||||||
|
else if(it->evt->flags & TASK_EVT_ON_TIMEOUT)
|
||||||
|
{
|
||||||
|
// check if timeout
|
||||||
|
if(difftime(time(NULL),task->stamp) > it->evt->timeout )
|
||||||
|
{
|
||||||
|
enqueue(exec_list, task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
*poll_list = bst_insert(*poll_list, *pollsize, it->evt);
|
||||||
|
*pollsize = (*pollsize)+1;
|
||||||
|
}
|
||||||
|
it = it->next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void antd_task_bind_event(antd_task_t *task, int fd, int timeout, int flags)
|
||||||
|
{
|
||||||
|
antd_task_evt_item_t *eit = (antd_task_evt_item_t *)malloc(sizeof(antd_task_evt_item_t));
|
||||||
|
eit->fd = fd;
|
||||||
|
eit->timeout = timeout;
|
||||||
|
eit->flags = flags;
|
||||||
|
eit->task = task;
|
||||||
|
enqueue(&task->events, eit);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *antd_scheduler_wait(void *ptr)
|
||||||
|
{
|
||||||
|
int pollsize, ready;
|
||||||
|
void *argv[3];
|
||||||
|
antd_queue_t exec_list = NULL;
|
||||||
|
bst_node_t* poll_list = NULL;
|
||||||
|
antd_queue_item_t it = NULL;
|
||||||
|
antd_queue_item_t curr = NULL;
|
||||||
|
antd_task_evt_item_t *eit = NULL;
|
||||||
|
bst_node_t* node = NULL;
|
||||||
|
struct pollfd *pfds = NULL;
|
||||||
|
antd_scheduler_t *scheduler = (antd_scheduler_t *)ptr;
|
||||||
|
|
||||||
while (scheduler->status)
|
while (scheduler->status)
|
||||||
{
|
{
|
||||||
stat = antd_task_schedule(scheduler);
|
pollsize = 0;
|
||||||
if (!stat)
|
argv[0] = &exec_list;
|
||||||
|
argv[1] = &poll_list;
|
||||||
|
argv[2] = &pollsize;
|
||||||
|
pthread_mutex_lock(&scheduler->scheduler_lock);
|
||||||
|
bst_for_each(scheduler->task_queue, task_event_collect, argv, 3);
|
||||||
|
pthread_mutex_unlock(&scheduler->scheduler_lock);
|
||||||
|
// schedule exec list first
|
||||||
|
it = exec_list;
|
||||||
|
while(it)
|
||||||
|
{
|
||||||
|
if(it->task)
|
||||||
|
{
|
||||||
|
|
||||||
|
pthread_mutex_lock(&scheduler->scheduler_lock);
|
||||||
|
scheduler->task_queue = bst_delete(scheduler->task_queue, it->task->id);
|
||||||
|
pthread_mutex_unlock(&scheduler->scheduler_lock);
|
||||||
|
antd_task_schedule(scheduler, it->task);
|
||||||
|
}
|
||||||
|
curr = it;
|
||||||
|
it = it->next;
|
||||||
|
free(curr);
|
||||||
|
}
|
||||||
|
// Detect event on pollist
|
||||||
|
if(pollsize > 0)
|
||||||
|
{
|
||||||
|
pfds = (struct pollfd*)malloc(pollsize*sizeof(struct pollfd));
|
||||||
|
memset(pfds, 0, pollsize*sizeof(struct pollfd));
|
||||||
|
if(pfds)
|
||||||
|
{
|
||||||
|
argv[0] = pfds;
|
||||||
|
bst_for_each(poll_list,task_polls_collect, argv, 1);
|
||||||
|
// now poll event
|
||||||
|
ready = poll(pfds, pollsize, POLL_EVENT_TO);
|
||||||
|
if(ready == -1)
|
||||||
|
{
|
||||||
|
// this should not happends
|
||||||
|
ERROR("Unable to poll: %s", strerror(errno));
|
||||||
|
// TODO: exit ?
|
||||||
|
}
|
||||||
|
else if(ready > 0)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < pollsize; i++)
|
||||||
|
{
|
||||||
|
// find the event
|
||||||
|
node = bst_find(poll_list,i);
|
||||||
|
if(node)
|
||||||
|
eit = (antd_task_evt_item_t *)node->data;
|
||||||
|
if(eit)
|
||||||
|
{
|
||||||
|
if( ((eit->flags & TASK_EVT_ON_READABLE) && (pfds[i].revents & POLLIN))
|
||||||
|
|| ( (eit->flags & TASK_EVT_ON_WRITABLE) && (pfds[i].revents & POLLOUT))
|
||||||
|
) {
|
||||||
|
// event triggered schedule the task
|
||||||
|
pthread_mutex_lock(&scheduler->scheduler_lock);
|
||||||
|
scheduler->task_queue = bst_delete(scheduler->task_queue, eit->task->id);
|
||||||
|
pthread_mutex_unlock(&scheduler->scheduler_lock);
|
||||||
|
antd_task_schedule(scheduler, eit->task);
|
||||||
|
}
|
||||||
|
else if( (pfds[i].revents & POLLERR) || (pfds[i].revents & POLLHUP) ) {
|
||||||
|
// task is no longer available
|
||||||
|
ERROR("Poll: Task %d is no longer valid. Remove it", eit->task->id);
|
||||||
|
// remove task from task queue
|
||||||
|
pthread_mutex_lock(&scheduler->scheduler_lock);
|
||||||
|
scheduler->task_queue = bst_delete(scheduler->task_queue, eit->task->id);
|
||||||
|
pthread_mutex_unlock(&scheduler->scheduler_lock);
|
||||||
|
antd_scheduler_destroy_data(eit->task->data);
|
||||||
|
destroy_task(eit->task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
free(pfds);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
exec_list = NULL;
|
||||||
|
bst_free(poll_list);
|
||||||
|
poll_list = NULL;
|
||||||
|
|
||||||
|
if (!scheduler->task_queue)
|
||||||
{
|
{
|
||||||
// no task found, go to idle state
|
// no task found, go to idle state
|
||||||
sem_wait(scheduler->scheduler_sem);
|
sem_wait(scheduler->scheduler_sem);
|
||||||
@ -672,7 +817,9 @@ void antd_scheduler_ext_statistic(int fd, void *data)
|
|||||||
}
|
}
|
||||||
int antd_scheduler_validate_data(antd_task_t *task)
|
int antd_scheduler_validate_data(antd_task_t *task)
|
||||||
{
|
{
|
||||||
return !(difftime(time(NULL), task->access_time) > MAX_VALIDITY_INTERVAL);
|
UNUSED(task);
|
||||||
|
return 1;
|
||||||
|
//!(difftime(time(NULL), task->access_time) > MAX_VALIDITY_INTERVAL);
|
||||||
}
|
}
|
||||||
void antd_scheduler_destroy_data(void *data)
|
void antd_scheduler_destroy_data(void *data)
|
||||||
{
|
{
|
||||||
|
@ -5,17 +5,16 @@
|
|||||||
#include <semaphore.h>
|
#include <semaphore.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
#define antd_create_task(h, d, c, t) (antd_mktask(h, d, c, t, HEAVY))
|
|
||||||
|
|
||||||
typedef enum
|
// define the event
|
||||||
{
|
#define TASK_EVT_ALWAY_ON 0x01
|
||||||
LIGHT,
|
#define TASK_EVT_ON_READABLE 0x02
|
||||||
HEAVY
|
#define TASK_EVT_ON_WRITABLE 0x04
|
||||||
} antd_task_type_t;
|
#define TASK_EVT_ON_TIMEOUT 0x08
|
||||||
|
|
||||||
typedef struct _antd_scheduler_t antd_scheduler_t;
|
typedef struct _antd_scheduler_t antd_scheduler_t;
|
||||||
typedef struct _antd_callback_t antd_callback_t;
|
typedef struct _antd_callback_t antd_callback_t;
|
||||||
typedef struct _antd_queue_item_t antd_scheduler_evt_t;
|
typedef struct _antd_queue_item_t* antd_task_evt_list_t;
|
||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
@ -42,17 +41,11 @@ typedef struct
|
|||||||
* one or more event, otherwise it will be
|
* one or more event, otherwise it will be
|
||||||
* rejected by the scheduler
|
* rejected by the scheduler
|
||||||
* */
|
* */
|
||||||
antd_scheduler_evt_t* events;
|
antd_task_evt_list_t events;
|
||||||
/**
|
/**
|
||||||
* user data if any
|
* user data if any
|
||||||
*/
|
*/
|
||||||
void *data;
|
void *data;
|
||||||
/**
|
|
||||||
* type of a task
|
|
||||||
* light tasks are executed directly
|
|
||||||
* heavy tasks are delegated to workers
|
|
||||||
*/
|
|
||||||
antd_task_type_t type;
|
|
||||||
} antd_task_t;
|
} antd_task_t;
|
||||||
/*
|
/*
|
||||||
* nit the main scheduler
|
* nit the main scheduler
|
||||||
@ -71,8 +64,17 @@ void antd_scheduler_destroy(antd_scheduler_t *);
|
|||||||
* - callback
|
* - callback
|
||||||
* - last data access time
|
* - last data access time
|
||||||
*/
|
*/
|
||||||
antd_task_t *antd_mktask(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t, antd_task_type_t type);
|
antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ALWAY_ON flag doest not need a file descriptor, it will be executed immediately by the scheduler
|
||||||
|
* ANY file descriptor should work with READABLE and WRITABLE flags, including timerfd for precision timeout
|
||||||
|
* Timeout flag (in seconds precision): val is the number of seconds
|
||||||
|
*
|
||||||
|
* File descriptor close operation is not handled by the scheduler
|
||||||
|
*
|
||||||
|
* */
|
||||||
|
void antd_task_bind_event(antd_task_t* task, int fd, int timeout, int flags);
|
||||||
/**
|
/**
|
||||||
* add a task
|
* add a task
|
||||||
*/
|
*/
|
||||||
@ -90,7 +92,7 @@ int antd_scheduler_ok(antd_scheduler_t *scheduler);
|
|||||||
*
|
*
|
||||||
* wait for event
|
* wait for event
|
||||||
*/
|
*/
|
||||||
void* antd_scheduler_wait(void *);
|
void *antd_scheduler_wait(void *);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* lock the scheduler
|
* lock the scheduler
|
||||||
@ -99,7 +101,7 @@ void antd_scheduler_lock(antd_scheduler_t *);
|
|||||||
/**
|
/**
|
||||||
* Get next valid task id
|
* Get next valid task id
|
||||||
* */
|
* */
|
||||||
int antd_scheduler_next_id(antd_scheduler_t* sched, int input);
|
int antd_scheduler_next_id(antd_scheduler_t *sched, int input);
|
||||||
/**
|
/**
|
||||||
* unlock the scheduler
|
* unlock the scheduler
|
||||||
* */
|
* */
|
||||||
|
Loading…
Reference in New Issue
Block a user