From c590898e03c2795df6fd6349f04a0780a761c609 Mon Sep 17 00:00:00 2001 From: lxsang Date: Tue, 26 Jan 2021 23:36:24 +0100 Subject: [PATCH] alpha version of proxy --- http_server.c | 267 +++++++++++++++++++++++++++++++++++++++++++----- httpd.c | 7 ++ lib/handle.c | 6 +- lib/handle.h | 2 + lib/scheduler.c | 51 +++++---- lib/scheduler.h | 2 +- 6 files changed, 281 insertions(+), 54 deletions(-) diff --git a/http_server.c b/http_server.c index f2d472a..6615820 100644 --- a/http_server.c +++ b/http_server.c @@ -1,4 +1,6 @@ -//#define _GNU_SOURCE +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif #include #include #include @@ -304,7 +306,7 @@ void *accept_request(void *data) antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); // first verify if the socket is ready antd_client_t *client = (antd_client_t *)rq->client; - + struct pollfd pfd[1]; pfd[0].fd = client->sock; @@ -316,7 +318,7 @@ void *accept_request(void *data) antd_error(rq->client, 400, "Bad request"); return task; } - if(pfd[0].revents & POLLERR || pfd[0].revents & POLLHUP) + if (pfd[0].revents & POLLERR || pfd[0].revents & POLLHUP) { antd_error(rq->client, 400, "Bad request"); return task; @@ -410,7 +412,6 @@ void *resolve_request(void *data) char path[2 * BUFFLEN]; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); char *url = (char *)dvalue(rq->request, "RESOURCE_PATH"); char *newurl = NULL; char *rqp = NULL; @@ -482,7 +483,6 @@ void *resolve_request(void *data) if (strcmp(mime_type, "application/octet-stream") == 0) { char *ex = ext(path); - //printf("Path: %s\n", path); char *h = NULL; if (ex) { @@ -523,6 +523,7 @@ void *resolve_request(void *data) } } } + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE); task->handle = serve_file; } return task; @@ -531,6 +532,8 @@ void *resolve_request(void *data) void *finish_request(void *data) { + if (!data) + return NULL; destroy_request(data); server_config.connection--; LOG("Remaining connection %d", server_config.connection); @@ -566,7 +569,6 @@ int rule_check(const char *k, const char *v, const char *host, const char *_url, } tmp = (char *)v; char *search = "<([a-zA-Z0-9]+)>"; - //printf("match again %s\n",tmp); while ((ret = regex_match(search, tmp, 2, val_matches))) { memcpy(buf + idx, tmp, val_matches[1].rm_so - 1); @@ -616,7 +618,6 @@ void *serve_file(void *data) { antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); char *path = (char *)dvalue(rq->request, "ABS_RESOURCE_PATH"); char *mime_type = (char *)dvalue(rq->request, "RESOURCE_MIME"); rq->client->state = ANTD_CLIENT_SERVE_FILE; @@ -737,7 +738,6 @@ char *apply_rules(dictionary_t rules, const char *host, char *url) { k = it->key; v = (char *)it->value; - //printf("[%s]: [%s]\n", k, v); // 1 group if (rule_check(k, v, host, url, query_string, url)) { @@ -755,6 +755,198 @@ char *apply_rules(dictionary_t rules, const char *host, char *url) return strdup(query_string); } + +static void *proxy_monitor(void *data) +{ + antd_request_t *rq = (antd_request_t *)data; + rq->client->state = ANTD_CLIENT_PROXY_MONITOR; + antd_client_t *proxy = (antd_client_t *)dvalue(rq->request, "PROXY_HANDLE"); + antd_task_t *task = antd_create_task(NULL, data, NULL, rq->client->last_io); + int ret, sz1, sz2; + char *buf = NULL; + buf = (char *)malloc(BUFFLEN); + struct pollfd pfd[1]; + memset(pfd, 0, sizeof(pfd)); + pfd[0].fd = proxy->sock; + pfd[0].events = POLLIN; + + ret = 1; + sz1 = antd_recv_upto(rq->client, buf, BUFFLEN); + + if ((sz1 < 0) || (sz1 > 0 && antd_send(proxy, buf, sz1) != sz1)) + { + ret = 0; + } + + if (poll(pfd, 1, 0) < 0) + { + ret = 0; + } + sz2 = antd_recv_upto(proxy, buf, BUFFLEN); + if (sz2 < 0 || (sz2 > 0 && antd_send(rq->client, buf, sz2) != sz2)) + { + ret = 0; + } + free(buf); + + if (ret == 0) + { + (void)close(proxy->sock); + return task; + } + + if (sz2 == 0) + { + if ( + pfd[0].revents & POLLERR || + pfd[0].revents & POLLRDHUP || + pfd[0].revents & POLLHUP || + pfd[0].revents & POLLNVAL || + pfd[0].revents & POLLIN) + { + (void)close(proxy->sock); + return task; + } + usleep(10000); + } + + task->handle = proxy_monitor; + task->access_time = rq->client->last_io; + + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE); + antd_task_bind_event(task, proxy->sock, 0, TASK_EVT_ON_READABLE | TASK_EVT_ON_WRITABLE); + return task; +} + +static void *proxify(void *data) +{ + int sock_fd, size, ret; + char *str = NULL; + chain_t it; + antd_request_t *rq = (antd_request_t *)data; + antd_client_t *proxy = NULL; + rq->client->state = ANTD_CLIENT_RESOLVE_REQUEST; + char *host = dvalue(rq->request, "PROXY_HOST"); + int port = atoi(dvalue(rq->request, "PROXY_PORT")); + char *path = dvalue(rq->request, "PROXY_PATH"); + char *query = dvalue(rq->request, "PROXY_QUERY"); + dictionary_t xheader = dvalue(rq->request, "REQUEST_HEADER"); + antd_task_t *task = antd_create_task(NULL, data, NULL, rq->client->last_io); + if (!xheader) + { + antd_error(rq->client, 400, "Badd Request"); + return task; + } + sock_fd = request_socket(ip_from_hostname(host), port); + if (sock_fd == -1) + { + antd_error(rq->client, 503, "Service Unavailable"); + return task; + } + set_nonblock(sock_fd); + /*struct timeval timeout; + timeout.tv_sec = 2; + timeout.tv_usec = 0; //POLL_EVENT_TO*1000; + if (setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) + { + ERROR("setsockopt failed:%s", strerror(errno)); + antd_error(rq->client, 500, "Internal proxy error"); + (void)close(sock_fd); + return task; + } + + if (setsockopt(sock_fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) + { + ERROR("setsockopt failed:%s", strerror(errno)); + antd_error(rq->client, 500, "Internal proxy error"); + (void)close(sock_fd); + return task; + }*/ + + proxy = (antd_client_t *)malloc(sizeof(antd_client_t)); + proxy->sock = sock_fd; + proxy->ssl = NULL; + proxy->zstream = NULL; + proxy->z_level = ANTD_CNONE; + + // store content length here + dput(rq->request, "PROXY_HANDLE", proxy); + + str = __s("%s %s?%s HTTP/1.1\r\n", (char *)dvalue(rq->request, "METHOD"), path, query); + size = strlen(str); + ret = antd_send(proxy, str, size); + free(str); + if (ret != size) + { + antd_error(rq->client, 500, ""); + (void)close(sock_fd); + return task; + } + for_each_assoc(it, xheader) + { + str = __s("%s: %s\r\n", it->key, (char *)it->value); + size = strlen(str); + ret = antd_send(proxy, str, size); + free(str); + if (ret != size) + { + antd_error(rq->client, 500, ""); + (void)close(sock_fd); + return task; + } + } + (void)antd_send(proxy, "\r\n", 2); + // now monitor the proxy + task->handle = proxy_monitor; + task->access_time = rq->client->last_io; + // register event + + antd_task_bind_event(task, proxy->sock, 0, TASK_EVT_ON_READABLE | TASK_EVT_ON_WRITABLE); + return task; +} +/** + * Check if the current request is e reverse proxy + * return a proxy task if this is the case +*/ +static void *check_proxy(antd_request_t *rq, const char *path, const char *query) +{ + char *pattern = "^(https?)://([^:]+):([0-9]+)(.*)$"; + antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + char buff[256]; + regmatch_t matches[5]; + int ret, size; + ret = regex_match(pattern, path, 5, matches); + + if (!ret) + { + return NULL; + } + + if (matches[1].rm_eo - matches[1].rm_so == 5) + { + // https is not supported for now + // TODO add https support + antd_error(rq->client, 503, "Service Unavailable"); + return task; + } + // http proxy request + size = matches[2].rm_eo - matches[2].rm_so < (int)sizeof(buff) ? matches[2].rm_eo - matches[2].rm_so : (int)sizeof(buff); + (void)memcpy(buff, path + matches[2].rm_so, size); + buff[size] = '\0'; + dput(rq->request, "PROXY_HOST", strdup(buff)); + + size = matches[3].rm_eo - matches[3].rm_so < (int)sizeof(buff) ? matches[3].rm_eo - matches[3].rm_so : (int)sizeof(buff); + (void)memcpy(buff, path + matches[3].rm_so, size); + buff[size] = '\0'; + dput(rq->request, "PROXY_PORT", strdup(buff)); + + dput(rq->request, "PROXY_PATH", strdup(path + matches[4].rm_so)); + dput(rq->request, "PROXY_QUERY", strdup(query)); + + task->handle = proxify; + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE | TASK_EVT_ON_WRITABLE); + return task; +} /** * Decode the HTTP request header */ @@ -811,10 +1003,16 @@ void *decode_request_header(void *data) antd_error(rq->client, 413, "Payload Too Large"); ERROR("Header size too large (%d): %d vs %d", rq->client->sock, header_size, HEADER_MAX_SIZE); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); return task; } } + if (ret == 0) + { + //antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); + task = antd_create_task(decode_request_header, (void *)rq, NULL, rq->client->last_io); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE); + return task; + } // check for content length size line = (char *)dvalue(xheader, "Content-Length"); if (line) @@ -827,7 +1025,6 @@ void *decode_request_header(void *data) // 100 ms sleep usleep(100000); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); return task; } } @@ -865,6 +1062,16 @@ void *decode_request_header(void *data) dput(rq->request, "COOKIE", cookie); if (host) free(host); + + // check if this is a reverse proxy ? + task = check_proxy(rq, buf, query); + if (task) + { + if (query) + free(query); + return task; + } + dput(rq->request, "RESOURCE_PATH", url_decode(buf)); if (query) { @@ -873,7 +1080,7 @@ void *decode_request_header(void *data) } // header ok, now checkmethod task = antd_create_task(decode_request, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); // return task; } @@ -892,8 +1099,8 @@ void *decode_request(void *data) ws = 1; method = (char *)dvalue(rq->request, "METHOD"); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); - if (strcmp(method, "GET") == 0 || strcmp(method, "HEAD") == 0 || strcmp(method, "OPTIONS") == 0) + //antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); + if (EQU(method, "GET")) { //if(ctype) free(ctype); if (ws && ws_key != NULL) @@ -908,7 +1115,12 @@ void *decode_request(void *data) task->handle = resolve_request; return task; } - else if (strcmp(method, "POST") == 0) + else if(EQU(method, "HEAD") || EQU(method, "OPTIONS") || EQU(method, "DELETE") ) + { + task->handle = resolve_request; + return task; + } + else if (EQU(method, "POST") || EQU(method, "PUT") || EQU(method, "PATCH")) { task->handle = resolve_request; return task; @@ -923,6 +1135,7 @@ void *decode_request(void *data) void *decode_post_request(void *data) { antd_request_t *rq = (antd_request_t *)data; + rq->client->state = ANTD_CLIENT_RQ_DATA_DECODE; dictionary_t request = dvalue(rq->request, "REQUEST_DATA"); dictionary_t headers = dvalue(rq->request, "REQUEST_HEADER"); char *ctype = NULL; @@ -935,8 +1148,8 @@ void *decode_post_request(void *data) clen = atoi(tmp); char *method = (char *)dvalue(rq->request, "METHOD"); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); - if (!method || strcmp(method, "POST") != 0) + //antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); + if (!method || (!EQU(method, "POST") && !EQU(method, "PUT") && EQU(method, "PATCH"))) return task; if (ctype == NULL || clen == -1) { @@ -954,7 +1167,10 @@ void *decode_post_request(void *data) } else if (clen > 0) { + // WARN: this may not work on ssl socket + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE | TASK_EVT_ON_WRITABLE); task->handle = decode_post_request; + return task; } } else if (strstr(ctype, FORM_MULTI_PART)) @@ -965,7 +1181,6 @@ void *decode_post_request(void *data) else { char *pquery = post_data_decode(rq->client, clen); - //printf("POST: %s\n", pquery); char *key = strstr(ctype, "/"); if (key) key++; @@ -979,8 +1194,11 @@ void *decode_post_request(void *data) else if (clen > 0) { task->handle = decode_post_request; + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE | TASK_EVT_ON_WRITABLE); + return task; } } + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE); return task; } @@ -1000,7 +1218,6 @@ void ws_confirm_request(void *client, const char *key) if (n < 0) n = 0; strncat(rkey, WS_MAGIC_STRING, n); - //printf("RESPONDKEY '%s'\n", rkey); #ifdef USE_OPENSSL SHA_CTX context; #else @@ -1011,7 +1228,6 @@ void ws_confirm_request(void *client, const char *key) SHA1_Update(&context, rkey, strlen(rkey)); SHA1_Final((uint8_t *)sha_d, &context); Base64encode(base64, sha_d, 20); - //printf("Base 64 '%s'\n", base64); // send accept to client sprintf(buf, "HTTP/1.1 101 Switching Protocols\r\n"); antd_send(client, buf, strlen(buf)); @@ -1063,7 +1279,8 @@ void *decode_multi_part_request(void *data, const char *ctype) int len; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); + //antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE); //dictionary dic = NULL; boundary = strsep(&str_copy, "="); //discard first part boundary = str_copy; @@ -1097,7 +1314,7 @@ void *decode_multi_part_request_data(void *data) char *token, *keytoken, *valtoken; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); + //antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); char *boundary = (char *)dvalue(rq->request, "MULTI_PART_BOUNDARY"); dictionary_t dic = (dictionary_t)dvalue(rq->request, "REQUEST_DATA"); // search for content disposition: @@ -1214,20 +1431,23 @@ void *decode_multi_part_request_data(void *data) } free(part_name); } - //printf("[Lines]:%s\n",line); // check if end of request if (line && strstr(line, boundend)) { //LOG("End request %s", boundend); free(boundend); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE); return task; } + free(boundend); if (line && strstr(line, boundary)) { // continue upload + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE); task->handle = decode_multi_part_request_data; + return task; } - free(boundend); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE); return task; } /** @@ -1356,7 +1576,6 @@ void *execute_plugin(void *data, const char *pname) { free(task); task = antd_create_task(decode_post_request, (void *)rq, fn, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); } return task; } diff --git a/httpd.c b/httpd.c index e487ee6..1684026 100644 --- a/httpd.c +++ b/httpd.c @@ -297,6 +297,13 @@ void antd_scheduler_ext_statistic(int fd, void *user_data) void antd_scheduler_destroy_data(void *data) { + antd_request_t *rq = (antd_request_t *)data; + antd_client_t *proxy = (antd_client_t *)dvalue(rq->request, "PROXY_HANDLE"); + if(proxy) + { + printf("closing proxy \n"); + close(proxy->sock); + } finish_request(data); } diff --git a/lib/handle.c b/lib/handle.c index 1ebb29c..3eaf5aa 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -603,11 +603,11 @@ int antd_recv_upto(void *src, void *data, int len) time(&source->last_io); return received; } - if (received == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) + if (received <= 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - return -1; + return 0; } - return 0; + return -1; #ifdef USE_OPENSSL } #endif diff --git a/lib/handle.h b/lib/handle.h index 2dcd3a1..fdbb923 100644 --- a/lib/handle.h +++ b/lib/handle.h @@ -25,6 +25,8 @@ #define ANTD_CLIENT_PROTO_CHECK 0x4 #define ANTD_CLIENT_RESOLVE_REQUEST 0x5 #define ANTD_CLIENT_SERVE_FILE 0x6 +#define ANTD_CLIENT_RQ_DATA_DECODE 0x7 +#define ANTD_CLIENT_PROXY_MONITOR 0x8 typedef enum { diff --git a/lib/scheduler.c b/lib/scheduler.c index 66a73db..1539779 100644 --- a/lib/scheduler.c +++ b/lib/scheduler.c @@ -10,7 +10,7 @@ #include "utils.h" #include "bst.h" -#define MAX_VALIDITY_INTERVAL 60 // 1minute +#define MAX_VALIDITY_INTERVAL 30 // s #define MAX_FIFO_NAME_SZ 255 // callback definition @@ -644,33 +644,36 @@ static void antd_deploy_task(bst_node_t* node, void** argv, int argc) return; antd_scheduler_t* sched = (antd_scheduler_t*) argv[0]; antd_task_t* task = node->data; + pthread_mutex_lock(&sched->scheduler_lock); + sched->task_queue = bst_delete(sched->task_queue, task->id); + pthread_mutex_unlock(&sched->scheduler_lock); antd_task_schedule(sched, task); } static void task_event_collect(bst_node_t* node, void** argv, int argc) { UNUSED(argc); antd_task_t* task = (antd_task_t*) node->data; - antd_queue_t* exec_list = (antd_queue_t*) argv[0]; + bst_node_t** exec_list = (bst_node_t**) argv[0]; bst_node_t** poll_list = (bst_node_t**) argv[1]; int* pollsize = (int*) argv[2]; if(!task->events) { - enqueue(exec_list, task); + *exec_list = bst_insert(*exec_list,task->id, task); return; } antd_queue_item_t it = task->events; while(it) { - if(it->evt->flags & TASK_EVT_ALWAY_ON) + if((it->evt->flags & TASK_EVT_ALWAY_ON) || antd_scheduler_validate_data(task) == 0 ) { - enqueue(exec_list, task); + *exec_list = bst_insert(*exec_list,task->id, task); } else if(it->evt->flags & TASK_EVT_ON_TIMEOUT) { // check if timeout if(difftime(time(NULL),task->stamp) > it->evt->timeout ) { - enqueue(exec_list, task); + *exec_list = bst_insert(*exec_list,task->id, task); } } else @@ -696,7 +699,7 @@ void *antd_scheduler_wait(void *ptr) { int pollsize, ready; void *argv[3]; - antd_queue_t exec_list = NULL; + //antd_queue_t exec_list = NULL; bst_node_t* poll_list = NULL; bst_node_t* scheduled_list = NULL; antd_queue_item_t it = NULL; @@ -709,14 +712,14 @@ void *antd_scheduler_wait(void *ptr) while (scheduler->status) { pollsize = 0; - argv[0] = &exec_list; + argv[0] = &scheduled_list; argv[1] = &poll_list; argv[2] = &pollsize; pthread_mutex_lock(&scheduler->scheduler_lock); bst_for_each(scheduler->task_queue, task_event_collect, argv, 3); pthread_mutex_unlock(&scheduler->scheduler_lock); // schedule exec list first - it = exec_list; + /*it = exec_list; while(it) { if(it->task) @@ -730,7 +733,7 @@ void *antd_scheduler_wait(void *ptr) curr = it; it = it->next; free(curr); - } + }*/ // Detect event on pollist if(pollsize > 0) { @@ -768,37 +771,33 @@ void *antd_scheduler_wait(void *ptr) // event triggered schedule the task pthread_mutex_lock(&scheduler->scheduler_lock); task_node = bst_find(scheduler->task_queue, eit->task->id); - if(task_node) - scheduler->task_queue = bst_delete(scheduler->task_queue, eit->task->id); pthread_mutex_unlock(&scheduler->scheduler_lock); if(task_node) scheduled_list = bst_insert(scheduled_list, eit->task->id, eit->task); //antd_task_schedule(scheduler, eit->task); } - else if( (pfds[i].revents & POLLERR) || (pfds[i].revents & POLLHUP) ) { + else if( (pfds[i].revents & POLLERR) || (pfds[i].revents & POLLHUP)) { // task is no longer available ERROR("Poll: Task %d is no longer valid. Remove it", eit->task->id); - // remove task from task queue - pthread_mutex_lock(&scheduler->scheduler_lock); - scheduler->task_queue = bst_delete(scheduler->task_queue, eit->task->id); - pthread_mutex_unlock(&scheduler->scheduler_lock); + eit->task->access_time = 0; + eit->task->handle = NULL; antd_scheduler_destroy_data(eit->task->data); - destroy_task(eit->task); + eit->task->data = NULL; + scheduled_list = bst_insert(scheduled_list, eit->task->id, eit->task); } } } - if(scheduled_list) - { - argv[0] = scheduler; - bst_for_each(scheduled_list, antd_deploy_task, argv, 1); - bst_free(scheduled_list); - scheduled_list = NULL; - } } free(pfds); } } - exec_list = NULL; + if(scheduled_list) + { + argv[0] = scheduler; + bst_for_each(scheduled_list, antd_deploy_task, argv, 1); + bst_free(scheduled_list); + scheduled_list = NULL; + } bst_free(poll_list); poll_list = NULL; diff --git a/lib/scheduler.h b/lib/scheduler.h index e8b1d6f..5278e0a 100644 --- a/lib/scheduler.h +++ b/lib/scheduler.h @@ -11,7 +11,7 @@ #define TASK_EVT_ON_READABLE 0x02 #define TASK_EVT_ON_WRITABLE 0x04 #define TASK_EVT_ON_TIMEOUT 0x08 -#define POLL_EVENT_TO 100 // ms +#define POLL_EVENT_TO 50 // ms typedef struct _antd_scheduler_t antd_scheduler_t; typedef struct _antd_callback_t antd_callback_t;