alpha version of proxy

This commit is contained in:
lxsang
2021-01-26 23:36:24 +01:00
parent 31accd9060
commit c590898e03
6 changed files with 281 additions and 54 deletions

View File

@ -603,11 +603,11 @@ int antd_recv_upto(void *src, void *data, int len)
time(&source->last_io);
return received;
}
if (received == 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
if (received <= 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
{
return -1;
return 0;
}
return 0;
return -1;
#ifdef USE_OPENSSL
}
#endif

View File

@ -25,6 +25,8 @@
#define ANTD_CLIENT_PROTO_CHECK 0x4
#define ANTD_CLIENT_RESOLVE_REQUEST 0x5
#define ANTD_CLIENT_SERVE_FILE 0x6
#define ANTD_CLIENT_RQ_DATA_DECODE 0x7
#define ANTD_CLIENT_PROXY_MONITOR 0x8
typedef enum
{

View File

@ -10,7 +10,7 @@
#include "utils.h"
#include "bst.h"
#define MAX_VALIDITY_INTERVAL 60 // 1minute
#define MAX_VALIDITY_INTERVAL 30 // s
#define MAX_FIFO_NAME_SZ 255
// callback definition
@ -644,33 +644,36 @@ static void antd_deploy_task(bst_node_t* node, void** argv, int argc)
return;
antd_scheduler_t* sched = (antd_scheduler_t*) argv[0];
antd_task_t* task = node->data;
pthread_mutex_lock(&sched->scheduler_lock);
sched->task_queue = bst_delete(sched->task_queue, task->id);
pthread_mutex_unlock(&sched->scheduler_lock);
antd_task_schedule(sched, task);
}
static void task_event_collect(bst_node_t* node, void** argv, int argc)
{
UNUSED(argc);
antd_task_t* task = (antd_task_t*) node->data;
antd_queue_t* exec_list = (antd_queue_t*) argv[0];
bst_node_t** exec_list = (bst_node_t**) argv[0];
bst_node_t** poll_list = (bst_node_t**) argv[1];
int* pollsize = (int*) argv[2];
if(!task->events)
{
enqueue(exec_list, task);
*exec_list = bst_insert(*exec_list,task->id, task);
return;
}
antd_queue_item_t it = task->events;
while(it)
{
if(it->evt->flags & TASK_EVT_ALWAY_ON)
if((it->evt->flags & TASK_EVT_ALWAY_ON) || antd_scheduler_validate_data(task) == 0 )
{
enqueue(exec_list, task);
*exec_list = bst_insert(*exec_list,task->id, task);
}
else if(it->evt->flags & TASK_EVT_ON_TIMEOUT)
{
// check if timeout
if(difftime(time(NULL),task->stamp) > it->evt->timeout )
{
enqueue(exec_list, task);
*exec_list = bst_insert(*exec_list,task->id, task);
}
}
else
@ -696,7 +699,7 @@ void *antd_scheduler_wait(void *ptr)
{
int pollsize, ready;
void *argv[3];
antd_queue_t exec_list = NULL;
//antd_queue_t exec_list = NULL;
bst_node_t* poll_list = NULL;
bst_node_t* scheduled_list = NULL;
antd_queue_item_t it = NULL;
@ -709,14 +712,14 @@ void *antd_scheduler_wait(void *ptr)
while (scheduler->status)
{
pollsize = 0;
argv[0] = &exec_list;
argv[0] = &scheduled_list;
argv[1] = &poll_list;
argv[2] = &pollsize;
pthread_mutex_lock(&scheduler->scheduler_lock);
bst_for_each(scheduler->task_queue, task_event_collect, argv, 3);
pthread_mutex_unlock(&scheduler->scheduler_lock);
// schedule exec list first
it = exec_list;
/*it = exec_list;
while(it)
{
if(it->task)
@ -730,7 +733,7 @@ void *antd_scheduler_wait(void *ptr)
curr = it;
it = it->next;
free(curr);
}
}*/
// Detect event on pollist
if(pollsize > 0)
{
@ -768,37 +771,33 @@ void *antd_scheduler_wait(void *ptr)
// event triggered schedule the task
pthread_mutex_lock(&scheduler->scheduler_lock);
task_node = bst_find(scheduler->task_queue, eit->task->id);
if(task_node)
scheduler->task_queue = bst_delete(scheduler->task_queue, eit->task->id);
pthread_mutex_unlock(&scheduler->scheduler_lock);
if(task_node)
scheduled_list = bst_insert(scheduled_list, eit->task->id, eit->task);
//antd_task_schedule(scheduler, eit->task);
}
else if( (pfds[i].revents & POLLERR) || (pfds[i].revents & POLLHUP) ) {
else if( (pfds[i].revents & POLLERR) || (pfds[i].revents & POLLHUP)) {
// task is no longer available
ERROR("Poll: Task %d is no longer valid. Remove it", eit->task->id);
// remove task from task queue
pthread_mutex_lock(&scheduler->scheduler_lock);
scheduler->task_queue = bst_delete(scheduler->task_queue, eit->task->id);
pthread_mutex_unlock(&scheduler->scheduler_lock);
eit->task->access_time = 0;
eit->task->handle = NULL;
antd_scheduler_destroy_data(eit->task->data);
destroy_task(eit->task);
eit->task->data = NULL;
scheduled_list = bst_insert(scheduled_list, eit->task->id, eit->task);
}
}
}
if(scheduled_list)
{
argv[0] = scheduler;
bst_for_each(scheduled_list, antd_deploy_task, argv, 1);
bst_free(scheduled_list);
scheduled_list = NULL;
}
}
free(pfds);
}
}
exec_list = NULL;
if(scheduled_list)
{
argv[0] = scheduler;
bst_for_each(scheduled_list, antd_deploy_task, argv, 1);
bst_free(scheduled_list);
scheduled_list = NULL;
}
bst_free(poll_list);
poll_list = NULL;

View File

@ -11,7 +11,7 @@
#define TASK_EVT_ON_READABLE 0x02
#define TASK_EVT_ON_WRITABLE 0x04
#define TASK_EVT_ON_TIMEOUT 0x08
#define POLL_EVENT_TO 100 // ms
#define POLL_EVENT_TO 50 // ms
typedef struct _antd_scheduler_t antd_scheduler_t;
typedef struct _antd_callback_t antd_callback_t;