ant-http/lib/scheduler.c

902 lines
26 KiB
C
Raw Permalink Normal View History

2020-08-25 16:40:24 +02:00
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <sys/time.h>
2021-01-03 21:31:35 +01:00
#include <poll.h>
2024-03-12 21:59:50 +01:00
#include <pthread.h>
#include <semaphore.h>
2018-09-25 17:51:56 +02:00
#include "scheduler.h"
2020-08-25 16:40:24 +02:00
#include "utils.h"
2021-01-03 11:24:55 +01:00
#include "bst.h"
#define MAX_VALIDITY_INTERVAL 30
#define MAX_NAME_SZ 255
2021-01-03 11:24:55 +01:00
// callback definition
struct _antd_callback_t
{
void *(*handle)(void *);
struct _antd_callback_t *next;
};
2021-01-03 21:31:35 +01:00
typedef struct
{
int flags;
2021-01-03 11:24:55 +01:00
int fd;
struct timeval stamp;
2021-01-03 21:31:35 +01:00
int timeout; // seconds
antd_task_t *task;
} antd_task_evt_item_t;
2021-01-03 11:24:55 +01:00
struct _antd_queue_item_t
{
union
{
2021-01-03 21:31:35 +01:00
antd_task_evt_item_t *evt;
2021-01-03 11:24:55 +01:00
antd_task_t *task;
2021-01-03 21:31:35 +01:00
void *raw_ptr;
2021-01-03 11:24:55 +01:00
};
struct _antd_queue_item_t *next;
2021-01-03 21:31:35 +01:00
};
2021-01-03 11:24:55 +01:00
2021-01-03 21:31:35 +01:00
typedef struct _antd_queue_item_t *antd_queue_item_t;
2021-01-03 11:24:55 +01:00
typedef antd_queue_item_t antd_queue_t;
typedef struct
{
int id;
pthread_t tid;
antd_task_t* current_task;
2021-01-03 11:24:55 +01:00
void *manager;
} antd_worker_t;
struct _antd_scheduler_t
{
// data lock
pthread_mutex_t scheduler_lock;
pthread_mutex_t worker_lock;
pthread_mutex_t pending_lock;
// event handle
sem_t *scheduler_sem;
sem_t *worker_sem;
// worker and data
bst_node_t *task_queue;
antd_queue_t workers_queue;
uint8_t status; // 0 stop, 1 working
antd_worker_t *workers;
int n_workers;
int pending_task;
int id_allocator;
char stat_fifo[MAX_NAME_SZ];
char sched_name[MAX_NAME_SZ];
char worker_hub[MAX_NAME_SZ];
2021-01-03 11:24:55 +01:00
int stat_fd;
pthread_t stat_tid;
};
static antd_callback_t *callback_of(void *(*callback)(void *));
static void antd_execute_task(antd_scheduler_t *, antd_task_t *);
2021-01-03 21:31:35 +01:00
static void antd_task_schedule(antd_scheduler_t *scheduler, antd_task_t *task);
static void destroy_task(void *data);
2018-09-25 17:51:56 +02:00
static void set_nonblock(int fd)
{
int flags;
flags = fcntl(fd, F_GETFL, 0);
if (flags == -1)
{
ERROR("Unable to set flag");
}
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
2021-01-03 11:24:55 +01:00
static void enqueue(antd_queue_t *q, void *data)
2018-09-25 17:51:56 +02:00
{
2021-01-03 11:24:55 +01:00
antd_queue_item_t it = *q;
while (it && it->next != NULL)
2018-09-25 17:51:56 +02:00
it = it->next;
2021-01-03 11:24:55 +01:00
antd_queue_item_t new_it = (antd_queue_item_t)malloc(sizeof *new_it);
new_it->raw_ptr = data;
new_it->next = NULL;
if (!it) // first task
2018-09-25 17:51:56 +02:00
{
2021-01-03 11:24:55 +01:00
*q = new_it;
2018-09-25 17:51:56 +02:00
}
else
{
2021-01-03 11:24:55 +01:00
it->next = new_it;
2018-09-25 17:51:56 +02:00
}
}
static void stop(antd_scheduler_t *scheduler)
2018-09-25 17:51:56 +02:00
{
2018-10-01 22:49:20 +02:00
scheduler->status = 0;
2018-10-10 12:42:47 +02:00
// unlock all idle workers if any
2018-10-01 22:49:20 +02:00
for (int i = 0; i < scheduler->n_workers; i++)
2018-10-14 10:57:09 +02:00
sem_post(scheduler->worker_sem);
if (scheduler->scheduler_sem)
2019-12-20 16:49:41 +01:00
sem_post(scheduler->scheduler_sem);
2018-10-10 12:42:47 +02:00
for (int i = 0; i < scheduler->n_workers; i++)
if (scheduler->workers[i].id != -1)
2018-10-10 12:42:47 +02:00
pthread_join(scheduler->workers[i].tid, NULL);
if (scheduler->workers)
free(scheduler->workers);
if(scheduler->stat_tid)
(void)pthread_cancel(scheduler->stat_tid);
2018-09-27 15:00:19 +02:00
// destroy all the mutex
2018-10-01 22:49:20 +02:00
pthread_mutex_destroy(&scheduler->scheduler_lock);
pthread_mutex_destroy(&scheduler->worker_lock);
pthread_mutex_destroy(&scheduler->pending_lock);
sem_unlink(scheduler->sched_name);
sem_unlink(scheduler->worker_hub);
2018-10-14 10:57:09 +02:00
sem_close(scheduler->scheduler_sem);
sem_close(scheduler->worker_sem);
2018-09-25 17:51:56 +02:00
}
2021-01-03 11:24:55 +01:00
static antd_queue_item_t dequeue(antd_queue_t *q)
2018-09-25 17:51:56 +02:00
{
2021-01-03 11:24:55 +01:00
antd_queue_item_t it = *q;
if (it)
2018-09-25 17:51:56 +02:00
{
2018-09-27 15:00:19 +02:00
*q = it->next;
it->next = NULL;
2018-09-25 17:51:56 +02:00
}
return it;
}
2021-01-03 11:24:55 +01:00
static antd_callback_t *callback_of(void *(*callback)(void *))
2018-09-25 17:51:56 +02:00
{
antd_callback_t *cb = NULL;
if (callback)
2018-09-25 17:51:56 +02:00
{
cb = (antd_callback_t *)malloc(sizeof *cb);
2018-09-25 17:51:56 +02:00
cb->handle = callback;
cb->next = NULL;
}
return cb;
}
static void free_callback(antd_callback_t *cb)
2018-09-25 17:51:56 +02:00
{
antd_callback_t *it = cb;
antd_callback_t *curr;
while (it)
2018-09-25 17:51:56 +02:00
{
curr = it;
it = it->next;
free(curr);
}
}
static void enqueue_callback(antd_callback_t *cb, antd_callback_t *el)
2018-09-25 17:51:56 +02:00
{
antd_callback_t *it = cb;
while (it && it->next != NULL)
2018-09-25 17:51:56 +02:00
it = it->next;
if (!it)
return; // this should not happend
2018-09-25 17:51:56 +02:00
it->next = el;
}
static void execute_callback(antd_scheduler_t *scheduler, antd_task_t *task)
2018-09-25 17:51:56 +02:00
{
antd_callback_t *cb = task->callback;
if (cb)
2018-09-25 17:51:56 +02:00
{
// call the first come call back
task->handle = cb->handle;
task->callback = task->callback->next;
free(cb);
2021-01-03 21:31:35 +01:00
//antd_task_bind_event(task, 0, 0, TASK_EVT_ALWAY_ON);
2021-01-03 11:24:55 +01:00
antd_scheduler_add_task(scheduler, task);
2018-09-25 17:51:56 +02:00
}
else
{
2021-01-03 21:31:35 +01:00
destroy_task(task);
2018-09-25 17:51:56 +02:00
}
}
2021-01-03 11:24:55 +01:00
static void destroy_queue(antd_queue_t q, int is_task)
2018-09-27 15:00:19 +02:00
{
2021-01-03 11:24:55 +01:00
antd_queue_item_t it, curr;
2018-09-27 15:00:19 +02:00
it = q;
while (it)
2018-09-27 15:00:19 +02:00
{
2021-01-03 21:31:35 +01:00
if (is_task)
2021-01-03 11:24:55 +01:00
{
// first free the task
2021-01-03 21:31:35 +01:00
destroy_task(it->task);
2021-01-03 11:24:55 +01:00
}
else
{
2021-01-03 21:31:35 +01:00
if (it->raw_ptr)
2021-01-03 11:24:55 +01:00
{
free(it->raw_ptr);
it->raw_ptr = NULL;
2021-01-03 11:24:55 +01:00
}
2021-01-03 21:31:35 +01:00
}
2018-09-27 15:00:19 +02:00
// then free the placeholder
curr = it;
it = it->next;
free(curr);
}
}
static void *work(antd_worker_t *worker)
2018-09-27 17:18:31 +02:00
{
antd_scheduler_t *scheduler = (antd_scheduler_t *)worker->manager;
while (scheduler->status)
2018-09-27 17:18:31 +02:00
{
2021-01-03 11:24:55 +01:00
antd_queue_item_t it;
2018-10-01 22:49:20 +02:00
pthread_mutex_lock(&scheduler->worker_lock);
it = dequeue(&scheduler->workers_queue);
pthread_mutex_unlock(&scheduler->worker_lock);
// execute the task
//LOG("task executed by worker %d\n", worker->pid);
2018-10-10 12:42:47 +02:00
// no task to execute, just sleep wait
if (!it)
2018-10-09 17:24:00 +02:00
{
2018-10-10 12:42:47 +02:00
//LOG("Worker %d goes to idle state\n", worker->id);
2018-10-14 10:57:09 +02:00
sem_wait(scheduler->worker_sem);
2018-10-09 17:24:00 +02:00
}
2018-10-10 12:42:47 +02:00
else
{
worker->current_task = it->task;
2018-10-10 12:42:47 +02:00
//LOG("task executed by worker %d\n", worker->id);
2021-01-03 11:24:55 +01:00
antd_execute_task(scheduler, it->task);
free(it);
worker->current_task = NULL;
2018-10-10 12:42:47 +02:00
}
}
return NULL;
}
static void antd_task_dump(int fd, antd_task_t* task, char* buffer)
2021-01-03 11:24:55 +01:00
{
if (task == NULL || fd < 0)
2021-01-03 11:24:55 +01:00
{
return;
}
int ret;
// send statistic on task data
snprintf(buffer, MAX_NAME_SZ, "---- Task %d created at: %lu ----\n", task->id, task->stamp);
ret = write(fd, buffer, strlen(buffer));
2021-01-03 11:24:55 +01:00
// send statistic on task data
snprintf(buffer, MAX_NAME_SZ, "Access time: %lu\nn", (unsigned long)task->access_time);
ret = write(fd, buffer, strlen(buffer));
2021-01-03 11:24:55 +01:00
snprintf(buffer, MAX_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL));
ret = write(fd, buffer, strlen(buffer));
2021-01-03 11:24:55 +01:00
if (task->handle)
{
snprintf(buffer, MAX_NAME_SZ, "Has handle: yes\n");
ret = write(fd, buffer, strlen(buffer));
2021-01-03 11:24:55 +01:00
}
2021-01-03 11:24:55 +01:00
if (task->callback)
{
snprintf(buffer, MAX_NAME_SZ, "Has callback: yes\n");
ret = write(fd, buffer, strlen(buffer));
2021-01-03 11:24:55 +01:00
}
UNUSED(ret);
// now print all task data statistic
antd_scheduler_ext_statistic(fd, task->data);
}
static void print_static_info(bst_node_t *node, void **args, int argc)
{
if (argc != 2)
{
return;
}
char *buffer = args[0];
int *fdp = args[1];
antd_task_t *task = (antd_task_t *)node->data;
antd_task_dump(*fdp, task, buffer);
2021-01-03 11:24:55 +01:00
}
static void *statistic(antd_scheduler_t *scheduler)
{
2022-08-30 17:36:19 +02:00
struct pollfd pfd;
int ret;
char buffer[MAX_NAME_SZ];
2021-01-03 11:24:55 +01:00
void *argc[2];
while (scheduler->status)
{
if (scheduler->stat_fd == -1)
{
scheduler->stat_fd = open(scheduler->stat_fifo, O_WRONLY);
if (scheduler->stat_fd == -1)
{
ERROR("Unable to open FIFO %s: %s", scheduler->stat_fifo, strerror(errno));
return NULL;
}
else
{
set_nonblock(scheduler->stat_fd);
}
}
2021-01-03 11:24:55 +01:00
argc[0] = buffer;
argc[1] = &scheduler->stat_fd;
2022-08-30 17:36:19 +02:00
pfd.fd = scheduler->stat_fd;
pfd.events = POLLOUT;
ret = poll(&pfd, 1, -1);
switch (ret)
{
case -1:
ERROR("Error on select(): %s\n", strerror(errno));
close(scheduler->stat_fd);
return NULL;
case 0:
break;
// we have data
default:
2022-08-30 17:36:19 +02:00
if (pfd.revents & POLLOUT)
{
if (scheduler->pending_task > 0)
{
pthread_mutex_lock(&scheduler->scheduler_lock);
// write statistic data
snprintf(buffer, MAX_NAME_SZ, "Pending task: %d. Detail:\n", scheduler->pending_task);
ret = write(scheduler->stat_fd, buffer, strlen(buffer));
2021-01-03 11:24:55 +01:00
bst_for_each(scheduler->task_queue, print_static_info, argc, 2);
pthread_mutex_unlock(&scheduler->scheduler_lock);
// write worker current task
for (int i = 0; i < scheduler->n_workers; i++)
{
snprintf(buffer, MAX_NAME_SZ, "Worker: %d. Detail:\n", i);
ret = write(scheduler->stat_fd, buffer, strlen(buffer));
if(scheduler->workers[i].current_task)
{
antd_task_dump(scheduler->stat_fd, scheduler->workers[i].current_task, buffer);
}
}
ret = close(scheduler->stat_fd);
scheduler->stat_fd = -1;
usleep(5000);
}
else
{
ret = write(scheduler->stat_fd, ".", 1);
2021-01-03 11:24:55 +01:00
if (ret == -1)
{
ret = close(scheduler->stat_fd);
scheduler->stat_fd = -1;
usleep(5000);
}
else
{
ret = write(scheduler->stat_fd, "\b", 1);
}
}
}
else
{
2021-01-03 11:24:55 +01:00
ret = close(scheduler->stat_fd);
scheduler->stat_fd = -1;
}
break;
}
/* else
{
ret = write(scheduler->stat_fd, ".", 1);
if(ret == -1)
{
ret = close(scheduler->stat_fd);
scheduler->stat_fd = -1;
}
} */
2018-09-27 17:18:31 +02:00
}
2020-08-19 12:26:17 +02:00
return NULL;
2018-09-27 15:00:19 +02:00
}
2018-10-01 22:49:20 +02:00
2018-09-27 17:18:31 +02:00
/*
Main API methods
init the main scheduler
*/
2018-09-26 10:30:04 +02:00
2021-01-03 11:24:55 +01:00
antd_scheduler_t *antd_scheduler_init(int n, const char *stat_name)
2018-09-25 17:51:56 +02:00
{
2021-01-03 11:24:55 +01:00
antd_scheduler_t *scheduler = (antd_scheduler_t *)malloc(sizeof(antd_scheduler_t));
2018-10-01 22:49:20 +02:00
scheduler->n_workers = n;
scheduler->status = 1;
scheduler->workers_queue = NULL;
scheduler->pending_task = 0;
scheduler->stat_fd = -1;
2021-01-03 11:24:55 +01:00
scheduler->id_allocator = 0;
scheduler->stat_tid = 0;
int pid = getpid();
snprintf(scheduler->sched_name,MAX_NAME_SZ, "scheduler.%d",pid);
snprintf(scheduler->worker_hub,MAX_NAME_SZ, "worker.%d",pid);
(void)memset(scheduler->stat_fifo, 0, MAX_NAME_SZ);
2021-01-03 11:24:55 +01:00
if (stat_name)
{
(void)strncpy(scheduler->stat_fifo, stat_name, MAX_NAME_SZ - 1);
2021-01-03 11:24:55 +01:00
}
2018-10-10 12:42:47 +02:00
// init semaphore
scheduler->scheduler_sem = sem_open(scheduler->sched_name, O_CREAT, 0644, 0);
2018-10-14 11:19:06 +02:00
if (scheduler->scheduler_sem == SEM_FAILED)
2018-10-10 12:42:47 +02:00
{
ERROR("Cannot open semaphore for scheduler: %s", strerror(errno));
2021-01-03 11:24:55 +01:00
free(scheduler);
return NULL;
2018-10-10 12:42:47 +02:00
}
scheduler->worker_sem = sem_open(scheduler->worker_hub, O_CREAT, 0600, 0);
2018-10-14 10:57:09 +02:00
if (!scheduler->worker_sem)
2018-10-10 12:42:47 +02:00
{
2019-12-11 23:17:42 +01:00
ERROR("Cannot open semaphore for workers");
2021-01-03 11:24:55 +01:00
free(scheduler);
return NULL;
2018-10-10 12:42:47 +02:00
}
// init lock
pthread_mutex_init(&scheduler->scheduler_lock, NULL);
2018-10-01 22:49:20 +02:00
pthread_mutex_init(&scheduler->worker_lock, NULL);
pthread_mutex_init(&scheduler->pending_lock, NULL);
2021-01-03 11:24:55 +01:00
scheduler->task_queue = NULL;
2018-09-25 17:51:56 +02:00
// create scheduler.workers
if (n > 0)
2018-09-25 17:51:56 +02:00
{
scheduler->workers = (antd_worker_t *)malloc(n * (sizeof(antd_worker_t)));
if (!scheduler->workers)
2018-09-25 17:51:56 +02:00
{
2019-12-11 23:17:42 +01:00
ERROR("Cannot allocate memory for worker");
2021-01-03 11:24:55 +01:00
free(scheduler);
return NULL;
2018-09-26 10:30:04 +02:00
}
for (int i = 0; i < scheduler->n_workers; i++)
2018-09-26 10:30:04 +02:00
{
2018-10-10 12:42:47 +02:00
scheduler->workers[i].id = -1;
scheduler->workers[i].manager = (void *)scheduler;
scheduler->workers[i].current_task = NULL;
if (pthread_create(&scheduler->workers[i].tid, NULL, (void *(*)(void *))work, (void *)&scheduler->workers[i]) != 0)
2018-09-26 10:30:04 +02:00
{
2020-08-25 16:40:24 +02:00
ERROR("pthread_create: cannot create worker: %s", strerror(errno));
2021-01-03 11:24:55 +01:00
free(scheduler);
return NULL;
2018-09-26 10:30:04 +02:00
}
2018-10-10 12:42:47 +02:00
else
{
scheduler->workers[i].id = i;
}
2018-09-25 17:51:56 +02:00
}
}
// delete the fifo if any
if (scheduler->stat_fifo[0] != '\0')
{
LOG("Statistic fifo at: %s", scheduler->stat_fifo);
(void)remove(scheduler->stat_fifo);
// create the fifo file
if (mkfifo(scheduler->stat_fifo, 0666) == -1)
{
ERROR("Unable to create statistic FIFO %s: %s", scheduler->stat_fifo, strerror(errno));
}
else
{
if (pthread_create(&scheduler->stat_tid, NULL, (void *(*)(void *))statistic, scheduler) != 0)
{
ERROR("pthread_create: cannot create statistic thread: %s", strerror(errno));
scheduler->stat_tid = 0;
}
}
}
2019-12-11 23:17:42 +01:00
LOG("Antd scheduler initialized with %d worker", scheduler->n_workers);
2021-01-03 11:24:55 +01:00
return scheduler;
}
static void destroy_task(void *data)
{
antd_task_t *task = (antd_task_t *)data;
2021-01-03 21:31:35 +01:00
if (!task)
return;
if (task->callback)
{
2021-01-03 11:24:55 +01:00
free_callback(task->callback);
2021-01-03 21:31:35 +01:00
task->callback = NULL;
}
if (task->events)
{
destroy_queue(task->events, 0);
task->events = NULL;
}
2021-01-03 11:24:55 +01:00
if (task)
free(task);
2018-09-25 17:51:56 +02:00
}
2021-01-03 11:24:55 +01:00
2018-09-25 17:51:56 +02:00
/*
destroy all pending task
2018-09-27 15:00:19 +02:00
pthread_mutex_lock(&scheduler.queue_lock);
2018-09-25 17:51:56 +02:00
*/
void antd_scheduler_destroy(antd_scheduler_t *scheduler)
2018-09-25 17:51:56 +02:00
{
2021-01-03 11:24:55 +01:00
if (!scheduler)
return;
2018-09-25 17:51:56 +02:00
// free all the chains
2018-10-01 22:49:20 +02:00
stop(scheduler);
2019-12-11 23:17:42 +01:00
LOG("Destroy remaining queue");
2021-01-03 11:24:55 +01:00
bst_free_cb(scheduler->task_queue, destroy_task);
scheduler->task_queue = NULL;
2021-01-03 21:31:35 +01:00
destroy_queue(scheduler->workers_queue, 1);
2021-01-03 11:24:55 +01:00
free(scheduler);
2018-09-25 17:51:56 +02:00
}
/*
create a task
*/
2021-01-03 21:31:35 +01:00
antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t atime)
2018-09-25 17:51:56 +02:00
{
antd_task_t *task = (antd_task_t *)malloc(sizeof(antd_task_t));
2018-09-25 17:51:56 +02:00
task->stamp = (unsigned long)time(NULL);
task->data = data;
task->handle = handle;
2021-01-03 11:24:55 +01:00
task->id = antd_task_data_id(data);
2018-09-25 17:51:56 +02:00
task->callback = callback_of(callback);
2019-07-31 15:11:59 +02:00
task->access_time = atime;
2021-01-03 21:31:35 +01:00
task->events = NULL;
2018-09-25 17:51:56 +02:00
return task;
}
/*
scheduling a task
*/
2021-01-03 11:24:55 +01:00
void antd_scheduler_add_task(antd_scheduler_t *scheduler, antd_task_t *task)
2018-09-25 17:51:56 +02:00
{
2021-01-03 21:31:35 +01:00
if (task->id == 0)
2021-01-03 11:24:55 +01:00
task->id = antd_scheduler_next_id(scheduler, task->id);
2018-10-01 22:49:20 +02:00
pthread_mutex_lock(&scheduler->scheduler_lock);
2021-01-03 11:24:55 +01:00
scheduler->task_queue = bst_insert(scheduler->task_queue, task->id, (void *)task);
2018-10-01 22:49:20 +02:00
pthread_mutex_unlock(&scheduler->scheduler_lock);
pthread_mutex_lock(&scheduler->pending_lock);
scheduler->pending_task++;
pthread_mutex_unlock(&scheduler->pending_lock);
2018-10-10 12:42:47 +02:00
// wake up the scheduler if idle
2018-10-14 10:57:09 +02:00
sem_post(scheduler->scheduler_sem);
2018-09-25 17:51:56 +02:00
}
2021-01-03 11:24:55 +01:00
static void antd_execute_task(antd_scheduler_t *scheduler, antd_task_t *task)
2018-09-25 17:51:56 +02:00
{
2021-01-03 11:24:55 +01:00
if (!task)
2018-10-01 22:49:20 +02:00
return;
2018-09-25 17:51:56 +02:00
// execute the task
2021-01-03 11:24:55 +01:00
void *ret = (*(task->handle))(task->data);
2018-09-25 17:51:56 +02:00
// check the return data if it is a new task
if (!ret)
2018-09-25 17:51:56 +02:00
{
// call the first callback
2021-01-03 11:24:55 +01:00
execute_callback(scheduler, task);
2018-09-25 17:51:56 +02:00
}
else
{
antd_task_t *rtask = (antd_task_t *)ret;
2021-01-03 11:24:55 +01:00
if (task->callback)
{
if (rtask->callback)
2018-09-25 17:51:56 +02:00
{
2021-01-03 11:24:55 +01:00
enqueue_callback(rtask->callback, task->callback);
2018-09-25 17:51:56 +02:00
}
else
{
2021-01-03 11:24:55 +01:00
rtask->callback = task->callback;
2018-09-25 17:51:56 +02:00
}
2021-01-03 21:31:35 +01:00
task->callback = NULL;
2018-09-25 17:51:56 +02:00
}
if (!rtask->handle)
2018-09-25 17:51:56 +02:00
{
// call the first callback
2018-10-01 22:49:20 +02:00
execute_callback(scheduler, rtask);
2021-01-03 21:31:35 +01:00
destroy_task(task);
2018-09-25 17:51:56 +02:00
}
else
{
2021-01-03 11:24:55 +01:00
antd_scheduler_add_task(scheduler, rtask);
2021-01-03 21:31:35 +01:00
destroy_task(task);
2018-09-25 17:51:56 +02:00
}
}
}
2018-09-27 17:18:31 +02:00
int antd_scheduler_busy(antd_scheduler_t *scheduler)
2018-09-25 17:51:56 +02:00
{
2018-10-01 22:49:20 +02:00
return scheduler->pending_task != 0;
2018-09-25 17:51:56 +02:00
}
2018-10-01 22:49:20 +02:00
2021-01-03 11:24:55 +01:00
void antd_scheduler_lock(antd_scheduler_t *sched)
{
pthread_mutex_lock(&sched->scheduler_lock);
}
void antd_scheduler_unlock(antd_scheduler_t *sched)
{
pthread_mutex_unlock(&sched->scheduler_lock);
}
2021-01-03 21:31:35 +01:00
static void antd_task_schedule(antd_scheduler_t *scheduler, antd_task_t *task)
2018-09-27 15:00:19 +02:00
{
2019-07-31 15:11:59 +02:00
// no task
2021-01-03 11:24:55 +01:00
if (!task)
2018-09-27 15:00:19 +02:00
{
2021-01-03 21:31:35 +01:00
return;
2018-09-27 15:00:19 +02:00
}
pthread_mutex_lock(&scheduler->pending_lock);
scheduler->pending_task--;
pthread_mutex_unlock(&scheduler->pending_lock);
2018-09-27 15:00:19 +02:00
// has the task now
2019-07-31 15:11:59 +02:00
// validate the task
2021-01-03 11:24:55 +01:00
//if (scheduler->validate_data && difftime(time(NULL), it->task->access_time) > MAX_VALIDITY_INTERVAL && it->task->priority == N_PRIORITY - 1)
if (antd_scheduler_validate_data(task) == 0)
2019-07-31 15:11:59 +02:00
{
// data task is not valid
LOG("Task is no longer valid and will be killed");
2021-01-03 11:24:55 +01:00
antd_scheduler_destroy_data(task->data);
2021-01-03 21:31:35 +01:00
destroy_task(task);
return;
2019-07-31 15:11:59 +02:00
}
2021-01-03 21:31:35 +01:00
if (scheduler->n_workers <= 0)
2018-09-27 15:00:19 +02:00
{
// do it by myself
2021-01-03 11:24:55 +01:00
antd_execute_task(scheduler, task);
2018-09-27 15:00:19 +02:00
}
else
{
// delegate to other workers by
//pushing to the worker queue
2018-10-01 22:49:20 +02:00
pthread_mutex_lock(&scheduler->worker_lock);
2021-01-03 11:24:55 +01:00
enqueue(&scheduler->workers_queue, task);
2018-10-01 22:49:20 +02:00
pthread_mutex_unlock(&scheduler->worker_lock);
2018-10-10 12:42:47 +02:00
// wake up idle worker
2018-10-14 10:57:09 +02:00
sem_post(scheduler->worker_sem);
2018-09-27 15:00:19 +02:00
}
2018-10-10 12:42:47 +02:00
}
2021-01-03 21:31:35 +01:00
static void task_polls_collect(bst_node_t* node, void** argv, int argc)
{
UNUSED(argc);
antd_task_evt_item_t* it = (antd_task_evt_item_t*)node->data;
struct pollfd* pfds = (struct pollfd*)argv[0];
if(it)
{
pfds[node->key].fd = it->fd;
if(it->flags & TASK_EVT_ON_READABLE)
{
pfds[node->key].events |= POLLIN;
}
if(it->flags & TASK_EVT_ON_WRITABLE)
{
pfds[node->key].events |= POLLOUT;
}
}
}
static void antd_deploy_task(bst_node_t* node, void** argv, int argc)
{
UNUSED(argc);
if(!node || !node->data)
return;
antd_scheduler_t* sched = (antd_scheduler_t*) argv[0];
antd_task_t* task = node->data;
pthread_mutex_lock(&sched->scheduler_lock);
sched->task_queue = bst_delete(sched->task_queue, task->id);
pthread_mutex_unlock(&sched->scheduler_lock);
antd_task_schedule(sched, task);
}
2021-01-03 21:31:35 +01:00
static void task_event_collect(bst_node_t* node, void** argv, int argc)
2018-10-10 12:42:47 +02:00
{
2021-01-03 21:31:35 +01:00
UNUSED(argc);
antd_task_t* task = (antd_task_t*) node->data;
bst_node_t** exec_list = (bst_node_t**) argv[0];
2021-01-03 21:31:35 +01:00
bst_node_t** poll_list = (bst_node_t**) argv[1];
struct timeval now;
2021-01-03 21:31:35 +01:00
int* pollsize = (int*) argv[2];
if(!task->events)
{
*exec_list = bst_insert(*exec_list,task->id, task);
2021-01-03 21:31:35 +01:00
return;
}
antd_queue_item_t it = task->events;
while(it)
{
if((it->evt->flags & TASK_EVT_ALWAY_ON) || antd_scheduler_validate_data(task) == 0 )
2021-01-03 21:31:35 +01:00
{
*exec_list = bst_insert(*exec_list,task->id, task);
2021-01-03 21:31:35 +01:00
}
else if(it->evt->flags & TASK_EVT_ON_TIMEOUT)
{
// check if timeout
gettimeofday(&now, NULL);
//do stuff
int diff = (int)(((now.tv_sec - it->evt->stamp.tv_sec) * 1000000 + now.tv_usec - it->evt->stamp.tv_usec) / 1000);
if( diff >= it->evt->timeout )
2021-01-03 21:31:35 +01:00
{
*exec_list = bst_insert(*exec_list,task->id, task);
2021-01-03 21:31:35 +01:00
}
}
else
{
*poll_list = bst_insert(*poll_list, *pollsize, it->evt);
*pollsize = (*pollsize)+1;
}
it = it->next;
}
}
void antd_task_bind_event(antd_task_t *task, int fd, int timeout, int flags)
{
antd_task_evt_item_t *eit = (antd_task_evt_item_t *)malloc(sizeof(antd_task_evt_item_t));
eit->fd = fd;
eit->timeout = timeout;
eit->flags = flags;
eit->task = task;
gettimeofday(&eit->stamp, NULL);
2021-01-03 21:31:35 +01:00
enqueue(&task->events, eit);
}
void *antd_scheduler_wait(void *ptr)
{
int pollsize, ready;
void *argv[3];
//antd_queue_t exec_list = NULL;
2021-01-03 21:31:35 +01:00
bst_node_t* poll_list = NULL;
bst_node_t* scheduled_list = NULL;
2021-01-03 21:31:35 +01:00
antd_task_evt_item_t *eit = NULL;
2021-01-22 01:12:26 +01:00
bst_node_t* node, *task_node = NULL;
2021-01-03 21:31:35 +01:00
struct pollfd *pfds = NULL;
antd_scheduler_t *scheduler = (antd_scheduler_t *)ptr;
while (scheduler->status)
2018-10-10 12:42:47 +02:00
{
2021-01-03 21:31:35 +01:00
pollsize = 0;
argv[0] = &scheduled_list;
2021-01-03 21:31:35 +01:00
argv[1] = &poll_list;
argv[2] = &pollsize;
pthread_mutex_lock(&scheduler->scheduler_lock);
bst_for_each(scheduler->task_queue, task_event_collect, argv, 3);
pthread_mutex_unlock(&scheduler->scheduler_lock);
// schedule exec list first
/*it = exec_list;
2021-01-03 21:31:35 +01:00
while(it)
{
if(it->task)
{
pthread_mutex_lock(&scheduler->scheduler_lock);
scheduler->task_queue = bst_delete(scheduler->task_queue, it->task->id);
pthread_mutex_unlock(&scheduler->scheduler_lock);
antd_task_schedule(scheduler, it->task);
}
curr = it;
it = it->next;
free(curr);
}*/
2021-01-03 21:31:35 +01:00
// Detect event on pollist
if(pollsize > 0)
{
pfds = (struct pollfd*)malloc(pollsize*sizeof(struct pollfd));
memset(pfds, 0, pollsize*sizeof(struct pollfd));
if(pfds)
{
argv[0] = pfds;
bst_for_each(poll_list,task_polls_collect, argv, 1);
// now poll event
ready = poll(pfds, pollsize, POLL_EVENT_TO);
if(ready == -1)
{
// this should not happends
ERROR("Unable to poll: %s", strerror(errno));
// TODO: exit ?
}
else if(ready > 0)
{
for (int i = 0; i < pollsize; i++)
{
// find the event
2021-01-22 01:12:26 +01:00
task_node = NULL;
eit = NULL;
2021-01-03 21:31:35 +01:00
node = bst_find(poll_list,i);
if(node)
{
2021-01-03 21:31:35 +01:00
eit = (antd_task_evt_item_t *)node->data;
}
2021-01-03 21:31:35 +01:00
if(eit)
{
if( ((eit->flags & TASK_EVT_ON_READABLE) && (pfds[i].revents & POLLIN))
|| ( (eit->flags & TASK_EVT_ON_WRITABLE) && (pfds[i].revents & POLLOUT))
) {
// event triggered schedule the task
pthread_mutex_lock(&scheduler->scheduler_lock);
2021-01-22 01:12:26 +01:00
task_node = bst_find(scheduler->task_queue, eit->task->id);
2021-01-03 21:31:35 +01:00
pthread_mutex_unlock(&scheduler->scheduler_lock);
2021-01-22 01:12:26 +01:00
if(task_node)
scheduled_list = bst_insert(scheduled_list, eit->task->id, eit->task);
//antd_task_schedule(scheduler, eit->task);
2021-01-03 21:31:35 +01:00
}
else if( (pfds[i].revents & POLLERR) || (pfds[i].revents & POLLHUP)) {
2021-01-03 21:31:35 +01:00
// task is no longer available
ERROR("Poll: Task %d is no longer valid. Remove it", eit->task->id);
eit->task->access_time = 0;
eit->task->handle = NULL;
/*
2021-01-03 21:31:35 +01:00
antd_scheduler_destroy_data(eit->task->data);
eit->task->data = NULL;*/
scheduled_list = bst_insert(scheduled_list, eit->task->id, eit->task);
2021-01-03 21:31:35 +01:00
}
}
}
}
free(pfds);
}
}
if(scheduled_list)
{
argv[0] = scheduler;
bst_for_each(scheduled_list, antd_deploy_task, argv, 1);
bst_free(scheduled_list);
scheduled_list = NULL;
}
2021-01-03 21:31:35 +01:00
bst_free(poll_list);
poll_list = NULL;
if (!scheduler->task_queue)
2018-10-10 12:42:47 +02:00
{
2021-01-22 01:12:26 +01:00
// reset id allocator
//scheduler->id_allocator=0;
2018-10-10 12:42:47 +02:00
// no task found, go to idle state
2018-10-14 10:57:09 +02:00
sem_wait(scheduler->scheduler_sem);
2018-10-10 12:42:47 +02:00
}
}
2021-01-03 11:24:55 +01:00
return NULL;
}
int antd_scheduler_ok(antd_scheduler_t *scheduler)
{
return scheduler->status;
}
int antd_scheduler_next_id(antd_scheduler_t *sched, int input)
{
int id = input;
if(sched->id_allocator < 0)
{
sched->id_allocator = 0;
}
2021-01-03 11:24:55 +01:00
pthread_mutex_lock(&sched->scheduler_lock);
if (id == 0)
{
sched->id_allocator++;
2021-01-03 11:24:55 +01:00
id = sched->id_allocator;
}
while (bst_find(sched->task_queue, id) != NULL)
{
2021-01-22 01:12:26 +01:00
sched->id_allocator++;
2021-01-03 11:24:55 +01:00
id = sched->id_allocator;
}
pthread_mutex_unlock(&sched->scheduler_lock);
return id;
}
2021-01-03 11:24:55 +01:00
void antd_scheduler_ext_statistic(int fd, void *data)
{
UNUSED(fd);
UNUSED(data);
}
int antd_scheduler_validate_data(antd_task_t *task)
{
2021-01-03 21:31:35 +01:00
UNUSED(task);
2021-01-05 19:00:51 +01:00
return !(difftime(time(NULL), task->access_time) > MAX_VALIDITY_INTERVAL);
2021-01-03 11:24:55 +01:00
}
void antd_scheduler_destroy_data(void *data)
{
UNUSED(data);
}
int antd_task_data_id(void *data)
{
UNUSED(data);
intptr_t ptr = (intptr_t)data;
return (int)ptr;
}