diff --git a/dist/antd-1.0.6b.tar.gz b/dist/antd-1.0.6b.tar.gz index 22b71f1..7f2cb38 100644 Binary files a/dist/antd-1.0.6b.tar.gz and b/dist/antd-1.0.6b.tar.gz differ diff --git a/http_server.c b/http_server.c index f2d472a..8b7d025 100644 --- a/http_server.c +++ b/http_server.c @@ -1,4 +1,6 @@ -//#define _GNU_SOURCE +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif #include #include #include @@ -304,7 +306,7 @@ void *accept_request(void *data) antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); // first verify if the socket is ready antd_client_t *client = (antd_client_t *)rq->client; - + struct pollfd pfd[1]; pfd[0].fd = client->sock; @@ -316,7 +318,7 @@ void *accept_request(void *data) antd_error(rq->client, 400, "Bad request"); return task; } - if(pfd[0].revents & POLLERR || pfd[0].revents & POLLHUP) + if (pfd[0].revents & POLLERR || pfd[0].revents & POLLHUP) { antd_error(rq->client, 400, "Bad request"); return task; @@ -410,7 +412,6 @@ void *resolve_request(void *data) char path[2 * BUFFLEN]; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); char *url = (char *)dvalue(rq->request, "RESOURCE_PATH"); char *newurl = NULL; char *rqp = NULL; @@ -482,7 +483,6 @@ void *resolve_request(void *data) if (strcmp(mime_type, "application/octet-stream") == 0) { char *ex = ext(path); - //printf("Path: %s\n", path); char *h = NULL; if (ex) { @@ -523,6 +523,7 @@ void *resolve_request(void *data) } } } + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE); task->handle = serve_file; } return task; @@ -531,6 +532,8 @@ void *resolve_request(void *data) void *finish_request(void *data) { + if (!data) + return NULL; destroy_request(data); server_config.connection--; LOG("Remaining connection %d", server_config.connection); @@ -566,7 +569,6 @@ int rule_check(const char *k, const char *v, const char *host, const char *_url, } tmp = (char *)v; char *search = "<([a-zA-Z0-9]+)>"; - //printf("match again %s\n",tmp); while ((ret = regex_match(search, tmp, 2, val_matches))) { memcpy(buf + idx, tmp, val_matches[1].rm_so - 1); @@ -616,7 +618,6 @@ void *serve_file(void *data) { antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); char *path = (char *)dvalue(rq->request, "ABS_RESOURCE_PATH"); char *mime_type = (char *)dvalue(rq->request, "RESOURCE_MIME"); rq->client->state = ANTD_CLIENT_SERVE_FILE; @@ -737,7 +738,6 @@ char *apply_rules(dictionary_t rules, const char *host, char *url) { k = it->key; v = (char *)it->value; - //printf("[%s]: [%s]\n", k, v); // 1 group if (rule_check(k, v, host, url, query_string, url)) { @@ -755,6 +755,226 @@ char *apply_rules(dictionary_t rules, const char *host, char *url) return strdup(query_string); } + +static void *proxy_monitor(void *data) +{ + antd_request_t *rq = (antd_request_t *)data; + rq->client->state = ANTD_CLIENT_PROXY_MONITOR; + antd_client_t *proxy = (antd_client_t *)dvalue(rq->request, "PROXY_HANDLE"); + antd_task_t *task = antd_create_task(NULL, data, NULL, rq->client->last_io); + int ret, sz1 = 0, sz2 = 0; + char *buf = NULL; + buf = (char *)malloc(BUFFLEN); + struct pollfd pfd[1]; + memset(pfd, 0, sizeof(pfd)); + pfd[0].fd = proxy->sock; + pfd[0].events = POLLIN; + ret = 1; + + if (poll(pfd, 1, 0) < 0) + { + (void)close(proxy->sock); + return task; + } + do + { + sz1 = antd_recv_upto(rq->client, buf, BUFFLEN); + + if ((sz1 < 0) || (sz1 > 0 && antd_send(proxy, buf, sz1) != sz1)) + { + ret = 0; + break; + } + sz2 = antd_recv_upto(proxy, buf, BUFFLEN); + if (sz2 < 0 || (sz2 > 0 && antd_send(rq->client, buf, sz2) != sz2)) + { + ret = 0; + break; + } + } while (sz1 > 0 || sz2 > 0); + free(buf); + + if (ret == 0) + { + (void)close(proxy->sock); + return task; + } + if (sz2 == 0) + { + if ( + pfd[0].revents & POLLERR || + pfd[0].revents & POLLRDHUP || + pfd[0].revents & POLLHUP || + pfd[0].revents & POLLNVAL) //|| + //pfd[0].revents & POLLIN) + { + (void)close(proxy->sock); + return task; + } + } + if(pfd[0].revents & POLLIN) + { + antd_task_bind_event(task, proxy->sock, 0, TASK_EVT_ON_READABLE); + } + else + { + antd_task_bind_event(task, proxy->sock, 100u, TASK_EVT_ON_TIMEOUT); + } + task->handle = proxy_monitor; + task->access_time = rq->client->last_io; + + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE); + return task; +} + +static void *proxify(void *data) +{ + int sock_fd, size, ret; + char *str = NULL; + chain_t it; + antd_request_t *rq = (antd_request_t *)data; + antd_client_t *proxy = NULL; + rq->client->state = ANTD_CLIENT_RESOLVE_REQUEST; + char *host = dvalue(rq->request, "PROXY_HOST"); + int port = atoi(dvalue(rq->request, "PROXY_PORT")); + char *path = dvalue(rq->request, "PROXY_PATH"); + char *query = dvalue(rq->request, "PROXY_QUERY"); + char* ptr, *ip; + dictionary_t xheader = dvalue(rq->request, "REQUEST_HEADER"); + antd_task_t *task = antd_create_task(NULL, data, NULL, rq->client->last_io); + if (!xheader) + { + antd_error(rq->client, 400, "Badd Request"); + return task; + } + pthread_mutex_lock(&server_mux); + ip = NULL; + // ip_from_host is not threadsafe, need to lock it + ptr = ip_from_hostname(host); + if(ptr) + { + ip = strdup(ptr); + } + pthread_mutex_unlock(&server_mux); + + if(!ip) + { + antd_error(rq->client, 502, "Badd address"); + return task; + } + + sock_fd = request_socket(ip, port); + free(ip); + if (sock_fd == -1) + { + antd_error(rq->client, 503, "Service Unavailable"); + return task; + } + set_nonblock(sock_fd); + /*struct timeval timeout; + timeout.tv_sec = 2; + timeout.tv_usec = 0; //POLL_EVENT_TO*1000; + if (setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) + { + ERROR("setsockopt failed:%s", strerror(errno)); + antd_error(rq->client, 500, "Internal proxy error"); + (void)close(sock_fd); + return task; + } + + if (setsockopt(sock_fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) + { + ERROR("setsockopt failed:%s", strerror(errno)); + antd_error(rq->client, 500, "Internal proxy error"); + (void)close(sock_fd); + return task; + }*/ + + proxy = (antd_client_t *)malloc(sizeof(antd_client_t)); + proxy->sock = sock_fd; + proxy->ssl = NULL; + proxy->zstream = NULL; + proxy->z_level = ANTD_CNONE; + time(&proxy->last_io); + + // store content length here + dput(rq->request, "PROXY_HANDLE", proxy); + + str = __s("%s %s?%s HTTP/1.1\r\n", (char *)dvalue(rq->request, "METHOD"), path, query); + size = strlen(str); + ret = antd_send(proxy, str, size); + free(str); + if (ret != size) + { + antd_error(rq->client, 500, ""); + (void)close(sock_fd); + return task; + } + for_each_assoc(it, xheader) + { + str = __s("%s: %s\r\n", it->key, (char *)it->value); + size = strlen(str); + ret = antd_send(proxy, str, size); + free(str); + if (ret != size) + { + antd_error(rq->client, 500, ""); + (void)close(sock_fd); + return task; + } + } + (void)antd_send(proxy, "\r\n", 2); + // now monitor the proxy + task->handle = proxy_monitor; + task->access_time = rq->client->last_io; + // register event + + antd_task_bind_event(task, proxy->sock, 0, TASK_EVT_ON_READABLE | TASK_EVT_ON_WRITABLE); + return task; +} +/** + * Check if the current request is e reverse proxy + * return a proxy task if this is the case +*/ +static void *check_proxy(antd_request_t *rq, const char *path, const char *query) +{ + char *pattern = "^(https?)://([^:]+):([0-9]+)(.*)$"; + antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + char buff[256]; + regmatch_t matches[5]; + int ret, size; + ret = regex_match(pattern, path, 5, matches); + + if (!ret) + { + return NULL; + } + + if (matches[1].rm_eo - matches[1].rm_so == 5) + { + // https is not supported for now + // TODO add https support + antd_error(rq->client, 503, "Service Unavailable"); + return task; + } + // http proxy request + size = matches[2].rm_eo - matches[2].rm_so < (int)sizeof(buff) ? matches[2].rm_eo - matches[2].rm_so : (int)sizeof(buff); + (void)memcpy(buff, path + matches[2].rm_so, size); + buff[size] = '\0'; + dput(rq->request, "PROXY_HOST", strdup(buff)); + + size = matches[3].rm_eo - matches[3].rm_so < (int)sizeof(buff) ? matches[3].rm_eo - matches[3].rm_so : (int)sizeof(buff); + (void)memcpy(buff, path + matches[3].rm_so, size); + buff[size] = '\0'; + dput(rq->request, "PROXY_PORT", strdup(buff)); + + dput(rq->request, "PROXY_PATH", strdup(path + matches[4].rm_so)); + dput(rq->request, "PROXY_QUERY", strdup(query)); + + task->handle = proxify; + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE | TASK_EVT_ON_WRITABLE); + return task; +} /** * Decode the HTTP request header */ @@ -811,10 +1031,16 @@ void *decode_request_header(void *data) antd_error(rq->client, 413, "Payload Too Large"); ERROR("Header size too large (%d): %d vs %d", rq->client->sock, header_size, HEADER_MAX_SIZE); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); return task; } } + if (ret == 0) + { + //antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); + task = antd_create_task(decode_request_header, (void *)rq, NULL, rq->client->last_io); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE); + return task; + } // check for content length size line = (char *)dvalue(xheader, "Content-Length"); if (line) @@ -827,7 +1053,6 @@ void *decode_request_header(void *data) // 100 ms sleep usleep(100000); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); return task; } } @@ -865,6 +1090,16 @@ void *decode_request_header(void *data) dput(rq->request, "COOKIE", cookie); if (host) free(host); + + // check if this is a reverse proxy ? + task = check_proxy(rq, buf, query); + if (task) + { + if (query) + free(query); + return task; + } + dput(rq->request, "RESOURCE_PATH", url_decode(buf)); if (query) { @@ -873,7 +1108,7 @@ void *decode_request_header(void *data) } // header ok, now checkmethod task = antd_create_task(decode_request, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); // return task; } @@ -892,8 +1127,8 @@ void *decode_request(void *data) ws = 1; method = (char *)dvalue(rq->request, "METHOD"); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); - if (strcmp(method, "GET") == 0 || strcmp(method, "HEAD") == 0 || strcmp(method, "OPTIONS") == 0) + //antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); + if (EQU(method, "GET")) { //if(ctype) free(ctype); if (ws && ws_key != NULL) @@ -908,7 +1143,12 @@ void *decode_request(void *data) task->handle = resolve_request; return task; } - else if (strcmp(method, "POST") == 0) + else if (EQU(method, "HEAD") || EQU(method, "OPTIONS") || EQU(method, "DELETE")) + { + task->handle = resolve_request; + return task; + } + else if (EQU(method, "POST") || EQU(method, "PUT") || EQU(method, "PATCH")) { task->handle = resolve_request; return task; @@ -923,6 +1163,7 @@ void *decode_request(void *data) void *decode_post_request(void *data) { antd_request_t *rq = (antd_request_t *)data; + rq->client->state = ANTD_CLIENT_RQ_DATA_DECODE; dictionary_t request = dvalue(rq->request, "REQUEST_DATA"); dictionary_t headers = dvalue(rq->request, "REQUEST_HEADER"); char *ctype = NULL; @@ -935,8 +1176,8 @@ void *decode_post_request(void *data) clen = atoi(tmp); char *method = (char *)dvalue(rq->request, "METHOD"); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); - if (!method || strcmp(method, "POST") != 0) + //antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); + if (!method || (!EQU(method, "POST") && !EQU(method, "PUT") && !EQU(method, "PATCH"))) return task; if (ctype == NULL || clen == -1) { @@ -954,7 +1195,11 @@ void *decode_post_request(void *data) } else if (clen > 0) { - task->handle = decode_post_request; + // WARN: this may not work on ssl socket + // antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE | TASK_EVT_ON_WRITABLE); + // task->handle = decode_post_request; + antd_error(rq->client, 400, "Bad Request, missing content data"); + return task; } } else if (strstr(ctype, FORM_MULTI_PART)) @@ -965,7 +1210,6 @@ void *decode_post_request(void *data) else { char *pquery = post_data_decode(rq->client, clen); - //printf("POST: %s\n", pquery); char *key = strstr(ctype, "/"); if (key) key++; @@ -978,9 +1222,13 @@ void *decode_post_request(void *data) } else if (clen > 0) { - task->handle = decode_post_request; + //task->handle = decode_post_request; + //antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE | TASK_EVT_ON_WRITABLE); + antd_error(rq->client, 400, "Bad Request, missing content data"); + return task; } } + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE); return task; } @@ -1000,7 +1248,6 @@ void ws_confirm_request(void *client, const char *key) if (n < 0) n = 0; strncat(rkey, WS_MAGIC_STRING, n); - //printf("RESPONDKEY '%s'\n", rkey); #ifdef USE_OPENSSL SHA_CTX context; #else @@ -1011,7 +1258,6 @@ void ws_confirm_request(void *client, const char *key) SHA1_Update(&context, rkey, strlen(rkey)); SHA1_Final((uint8_t *)sha_d, &context); Base64encode(base64, sha_d, 20); - //printf("Base 64 '%s'\n", base64); // send accept to client sprintf(buf, "HTTP/1.1 101 Switching Protocols\r\n"); antd_send(client, buf, strlen(buf)); @@ -1063,6 +1309,7 @@ void *decode_multi_part_request(void *data, const char *ctype) int len; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + //antd_task_bind_event(task, rq->client->sock, 0, ); antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); //dictionary dic = NULL; boundary = strsep(&str_copy, "="); //discard first part @@ -1104,7 +1351,6 @@ void *decode_multi_part_request_data(void *data) while (((len = read_buf(rq->client, buf, sizeof(buf))) > 0) && !strstr(buf, "Content-Disposition:")) ; ; - if (len <= 0 || !strstr(buf, "Content-Disposition:")) { return task; @@ -1214,20 +1460,23 @@ void *decode_multi_part_request_data(void *data) } free(part_name); } - //printf("[Lines]:%s\n",line); // check if end of request if (line && strstr(line, boundend)) { //LOG("End request %s", boundend); free(boundend); + //antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE); return task; } + free(boundend); if (line && strstr(line, boundary)) { // continue upload + //antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE); task->handle = decode_multi_part_request_data; + return task; } - free(boundend); + //antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE); return task; } /** @@ -1275,7 +1524,7 @@ char *post_data_decode(void *client, int len) char *ptr = query; int readlen = len > BUFFLEN ? BUFFLEN : len; int read = 0, stat = 1; - while (readlen > 0 && stat > 0) + while (readlen > 0 && stat >= 0) { stat = antd_recv_upto(client, ptr + read, readlen); if (stat > 0) @@ -1283,6 +1532,17 @@ char *post_data_decode(void *client, int len) read += stat; readlen = (len - read) > BUFFLEN ? BUFFLEN : (len - read); } + if (stat == 0) + { + if (difftime(time(NULL), ((antd_client_t*)client)->last_io) > MAX_IO_WAIT_TIME) + { + stat = -1; + } + else + { + usleep(POLL_EVENT_TO*1000); + } + } } if (read > 0) @@ -1356,7 +1616,6 @@ void *execute_plugin(void *data, const char *pname) { free(task); task = antd_create_task(decode_post_request, (void *)rq, fn, rq->client->last_io); - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); } return task; } @@ -1394,4 +1653,4 @@ int compressable(char *ctype) } return 0; } -#endif \ No newline at end of file +#endif diff --git a/httpd.c b/httpd.c index e487ee6..e14930e 100644 --- a/httpd.c +++ b/httpd.c @@ -297,6 +297,12 @@ void antd_scheduler_ext_statistic(int fd, void *user_data) void antd_scheduler_destroy_data(void *data) { + antd_request_t *rq = (antd_request_t *)data; + antd_client_t *proxy = (antd_client_t *)dvalue(rq->request, "PROXY_HANDLE"); + if(proxy) + { + close(proxy->sock); + } finish_request(data); } @@ -306,8 +312,8 @@ int antd_task_data_id(void *data) if(!rq) return 0; return antd_scheduler_next_id(scheduler,rq->client->sock); - //UNUSED(data); - //return antd_scheduler_next_id(scheduler,0); + /*UNUSED(data); + return antd_scheduler_next_id(scheduler,0);*/ } int main(int argc, char *argv[]) diff --git a/lib/handle.c b/lib/handle.c index 1ebb29c..2353978 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -565,6 +565,10 @@ int antd_recv_upto(void *src, void *data, int len) } else { + /*if (difftime(time(NULL), source->last_io) > MAX_IO_WAIT_TIME) + { + return -1; + }*/ switch (err) { case SSL_ERROR_NONE: @@ -603,11 +607,15 @@ int antd_recv_upto(void *src, void *data, int len) time(&source->last_io); return received; } - if (received == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) + /*else if (difftime(time(NULL), source->last_io) > MAX_IO_WAIT_TIME) { return -1; + }*/ + if (received <= 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + { + return 0; } - return 0; + return -1; #ifdef USE_OPENSSL } #endif diff --git a/lib/handle.h b/lib/handle.h index 2dcd3a1..fdbb923 100644 --- a/lib/handle.h +++ b/lib/handle.h @@ -25,6 +25,8 @@ #define ANTD_CLIENT_PROTO_CHECK 0x4 #define ANTD_CLIENT_RESOLVE_REQUEST 0x5 #define ANTD_CLIENT_SERVE_FILE 0x6 +#define ANTD_CLIENT_RQ_DATA_DECODE 0x7 +#define ANTD_CLIENT_PROXY_MONITOR 0x8 typedef enum { diff --git a/lib/scheduler.c b/lib/scheduler.c index 66a73db..076c112 100644 --- a/lib/scheduler.c +++ b/lib/scheduler.c @@ -5,12 +5,13 @@ #include #include #include +#include #include #include "scheduler.h" #include "utils.h" #include "bst.h" -#define MAX_VALIDITY_INTERVAL 60 // 1minute +#define MAX_VALIDITY_INTERVAL 30 // s #define MAX_FIFO_NAME_SZ 255 // callback definition @@ -24,6 +25,7 @@ typedef struct { int flags; int fd; + struct timeval stamp; int timeout; // seconds antd_task_t *task; } antd_task_evt_item_t; @@ -47,6 +49,7 @@ typedef struct { int id; pthread_t tid; + antd_task_t* current_task; void *manager; } antd_worker_t; @@ -237,48 +240,58 @@ static void *work(antd_worker_t *worker) } else { + worker->current_task = it->task; //LOG("task executed by worker %d\n", worker->id); antd_execute_task(scheduler, it->task); free(it); + worker->current_task = NULL; } } return NULL; } +static void antd_task_dump(int fd, antd_task_t* task, char* buffer) +{ + if (task == NULL || fd < 0) + { + return; + } + int ret; + // send statistic on task data + snprintf(buffer, MAX_FIFO_NAME_SZ, "---- Task %d created at: %lu ----\n", task->id, task->stamp); + ret = write(fd, buffer, strlen(buffer)); + + // send statistic on task data + snprintf(buffer, MAX_FIFO_NAME_SZ, "Access time: %lu\nn", (unsigned long)task->access_time); + ret = write(fd, buffer, strlen(buffer)); + + snprintf(buffer, MAX_FIFO_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL)); + ret = write(fd, buffer, strlen(buffer)); + + if (task->handle) + { + snprintf(buffer, MAX_FIFO_NAME_SZ, "Has handle: yes\n"); + ret = write(fd, buffer, strlen(buffer)); + } + + if (task->callback) + { + snprintf(buffer, MAX_FIFO_NAME_SZ, "Has callback: yes\n"); + ret = write(fd, buffer, strlen(buffer)); + } + UNUSED(ret); + // now print all task data statistic + antd_scheduler_ext_statistic(fd, task->data); +} static void print_static_info(bst_node_t *node, void **args, int argc) { if (argc != 2) { return; } - int ret; char *buffer = args[0]; int *fdp = args[1]; antd_task_t *task = (antd_task_t *)node->data; - // send statistic on task data - snprintf(buffer, MAX_FIFO_NAME_SZ, "---- Task %d created at: %lu ----\n", task->id, task->stamp); - ret = write(*fdp, buffer, strlen(buffer)); - - // send statistic on task data - snprintf(buffer, MAX_FIFO_NAME_SZ, "Access time: %lu\nn", (unsigned long)task->access_time); - ret = write(*fdp, buffer, strlen(buffer)); - - snprintf(buffer, MAX_FIFO_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL)); - ret = write(*fdp, buffer, strlen(buffer)); - - if (task->handle) - { - snprintf(buffer, MAX_FIFO_NAME_SZ, "Has handle: yes\n"); - ret = write(*fdp, buffer, strlen(buffer)); - } - - if (task->callback) - { - snprintf(buffer, MAX_FIFO_NAME_SZ, "Has callback: yes\n"); - ret = write(*fdp, buffer, strlen(buffer)); - } - UNUSED(ret); - // now print all task data statistic - antd_scheduler_ext_statistic(*fdp, task->data); + antd_task_dump(*fdp, task, buffer); } static void *statistic(antd_scheduler_t *scheduler) { @@ -329,6 +342,18 @@ static void *statistic(antd_scheduler_t *scheduler) bst_for_each(scheduler->task_queue, print_static_info, argc, 2); pthread_mutex_unlock(&scheduler->scheduler_lock); + + // write worker current task + for (int i = 0; i < scheduler->n_workers; i++) + { + snprintf(buffer, MAX_FIFO_NAME_SZ, "Worker: %d. Detail:\n", i); + ret = write(scheduler->stat_fd, buffer, strlen(buffer)); + if(scheduler->workers[i].current_task) + { + antd_task_dump(scheduler->stat_fd, scheduler->workers[i].current_task, buffer); + } + } + ret = close(scheduler->stat_fd); scheduler->stat_fd = -1; usleep(5000); @@ -421,6 +446,7 @@ antd_scheduler_t *antd_scheduler_init(int n, const char *stat_name) { scheduler->workers[i].id = -1; scheduler->workers[i].manager = (void *)scheduler; + scheduler->workers[i].current_task = NULL; if (pthread_create(&scheduler->workers[i].tid, NULL, (void *(*)(void *))work, (void *)&scheduler->workers[i]) != 0) { ERROR("pthread_create: cannot create worker: %s", strerror(errno)); @@ -644,33 +670,40 @@ static void antd_deploy_task(bst_node_t* node, void** argv, int argc) return; antd_scheduler_t* sched = (antd_scheduler_t*) argv[0]; antd_task_t* task = node->data; + pthread_mutex_lock(&sched->scheduler_lock); + sched->task_queue = bst_delete(sched->task_queue, task->id); + pthread_mutex_unlock(&sched->scheduler_lock); antd_task_schedule(sched, task); } static void task_event_collect(bst_node_t* node, void** argv, int argc) { UNUSED(argc); antd_task_t* task = (antd_task_t*) node->data; - antd_queue_t* exec_list = (antd_queue_t*) argv[0]; + bst_node_t** exec_list = (bst_node_t**) argv[0]; bst_node_t** poll_list = (bst_node_t**) argv[1]; + struct timeval now; int* pollsize = (int*) argv[2]; if(!task->events) { - enqueue(exec_list, task); + *exec_list = bst_insert(*exec_list,task->id, task); return; } antd_queue_item_t it = task->events; while(it) { - if(it->evt->flags & TASK_EVT_ALWAY_ON) + if((it->evt->flags & TASK_EVT_ALWAY_ON) || antd_scheduler_validate_data(task) == 0 ) { - enqueue(exec_list, task); + *exec_list = bst_insert(*exec_list,task->id, task); } else if(it->evt->flags & TASK_EVT_ON_TIMEOUT) { // check if timeout - if(difftime(time(NULL),task->stamp) > it->evt->timeout ) + gettimeofday(&now, NULL); + //do stuff + int diff = (int)(((now.tv_sec - it->evt->stamp.tv_sec) * 1000000 + now.tv_usec - it->evt->stamp.tv_usec) / 1000); + if( diff >= it->evt->timeout ) { - enqueue(exec_list, task); + *exec_list = bst_insert(*exec_list,task->id, task); } } else @@ -689,6 +722,7 @@ void antd_task_bind_event(antd_task_t *task, int fd, int timeout, int flags) eit->timeout = timeout; eit->flags = flags; eit->task = task; + gettimeofday(&eit->stamp, NULL); enqueue(&task->events, eit); } @@ -696,11 +730,9 @@ void *antd_scheduler_wait(void *ptr) { int pollsize, ready; void *argv[3]; - antd_queue_t exec_list = NULL; + //antd_queue_t exec_list = NULL; bst_node_t* poll_list = NULL; bst_node_t* scheduled_list = NULL; - antd_queue_item_t it = NULL; - antd_queue_item_t curr = NULL; antd_task_evt_item_t *eit = NULL; bst_node_t* node, *task_node = NULL; struct pollfd *pfds = NULL; @@ -709,14 +741,14 @@ void *antd_scheduler_wait(void *ptr) while (scheduler->status) { pollsize = 0; - argv[0] = &exec_list; + argv[0] = &scheduled_list; argv[1] = &poll_list; argv[2] = &pollsize; pthread_mutex_lock(&scheduler->scheduler_lock); bst_for_each(scheduler->task_queue, task_event_collect, argv, 3); pthread_mutex_unlock(&scheduler->scheduler_lock); // schedule exec list first - it = exec_list; + /*it = exec_list; while(it) { if(it->task) @@ -730,7 +762,7 @@ void *antd_scheduler_wait(void *ptr) curr = it; it = it->next; free(curr); - } + }*/ // Detect event on pollist if(pollsize > 0) { @@ -768,37 +800,34 @@ void *antd_scheduler_wait(void *ptr) // event triggered schedule the task pthread_mutex_lock(&scheduler->scheduler_lock); task_node = bst_find(scheduler->task_queue, eit->task->id); - if(task_node) - scheduler->task_queue = bst_delete(scheduler->task_queue, eit->task->id); pthread_mutex_unlock(&scheduler->scheduler_lock); if(task_node) scheduled_list = bst_insert(scheduled_list, eit->task->id, eit->task); //antd_task_schedule(scheduler, eit->task); } - else if( (pfds[i].revents & POLLERR) || (pfds[i].revents & POLLHUP) ) { + else if( (pfds[i].revents & POLLERR) || (pfds[i].revents & POLLHUP)) { // task is no longer available ERROR("Poll: Task %d is no longer valid. Remove it", eit->task->id); - // remove task from task queue - pthread_mutex_lock(&scheduler->scheduler_lock); - scheduler->task_queue = bst_delete(scheduler->task_queue, eit->task->id); - pthread_mutex_unlock(&scheduler->scheduler_lock); + eit->task->access_time = 0; + eit->task->handle = NULL; + /* antd_scheduler_destroy_data(eit->task->data); - destroy_task(eit->task); + eit->task->data = NULL;*/ + scheduled_list = bst_insert(scheduled_list, eit->task->id, eit->task); } } } - if(scheduled_list) - { - argv[0] = scheduler; - bst_for_each(scheduled_list, antd_deploy_task, argv, 1); - bst_free(scheduled_list); - scheduled_list = NULL; - } } free(pfds); } } - exec_list = NULL; + if(scheduled_list) + { + argv[0] = scheduler; + bst_for_each(scheduled_list, antd_deploy_task, argv, 1); + bst_free(scheduled_list); + scheduled_list = NULL; + } bst_free(poll_list); poll_list = NULL; diff --git a/lib/scheduler.h b/lib/scheduler.h index e8b1d6f..2d11a13 100644 --- a/lib/scheduler.h +++ b/lib/scheduler.h @@ -11,7 +11,7 @@ #define TASK_EVT_ON_READABLE 0x02 #define TASK_EVT_ON_WRITABLE 0x04 #define TASK_EVT_ON_TIMEOUT 0x08 -#define POLL_EVENT_TO 100 // ms +#define POLL_EVENT_TO 10 // ms typedef struct _antd_scheduler_t antd_scheduler_t; typedef struct _antd_callback_t antd_callback_t; diff --git a/lib/utils.c b/lib/utils.c index dcf4599..fc5d270 100644 --- a/lib/utils.c +++ b/lib/utils.c @@ -605,7 +605,11 @@ int request_socket(const char *ip, int port) { int sockfd; struct sockaddr_in dest; - + if(!ip) + { + ERROR("Invalid IP address"); + return -1; + } if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { ERROR("Socket: %s", strerror(errno)); @@ -638,6 +642,10 @@ char* ip_from_hostname(const char *hostname) struct hostent *he; struct in_addr **addr_list; int i; + if(!hostname) + { + return NULL; + } if ((he = gethostbyname(hostname)) == NULL) { // get the host info