diff --git a/antd-config.ini b/antd-config.ini index f9b2605..de3941f 100644 --- a/antd-config.ini +++ b/antd-config.ini @@ -36,7 +36,7 @@ gzip_types = text\/.*,.*\/css,.*\/json,.*\/javascript [PORT:443] htdocs=/opt/www/htdocs ; enable or disable SSL -ssl.enable=1 +ssl.enable=0 ; other config shoud be rules applied on this port ; For example the following rule will ; convert a request of type: @@ -46,12 +46,15 @@ ssl.enable=1 ; this is helpful to redirect many sub domains ; to a sub folder of the same server ; ^([a-zA-Z][a-zA-Z0-9]*)\.[a-zA-Z0-9]+\..*$ = /<1>? +; example of reverse proxy +; ^\/os\/+(.*)$ = http://localhost:80/os/router.lua?r=<1>& ; Sytax: [regular expression on the original request]=[new request rule] [PORT:80] htdocs=/opt/www/htdocs ; enable or disable SSL ssl.enable=0 +^\/os\/+(.*)$ = http://localhost:443/test.html? ; other config shoud be rules applied on this port ; For example the following rule will ; convert a request of type: diff --git a/dist/antd-1.0.6b.tar.gz b/dist/antd-1.0.6b.tar.gz index 01276a6..5816229 100644 Binary files a/dist/antd-1.0.6b.tar.gz and b/dist/antd-1.0.6b.tar.gz differ diff --git a/http_server.c b/http_server.c index e6f2ab4..f28b544 100644 --- a/http_server.c +++ b/http_server.c @@ -300,7 +300,7 @@ void *accept_request(void *data) antd_request_t *rq = (antd_request_t *)data; task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); fd_set read_flags, write_flags; // first verify if the socket is ready antd_client_t *client = (antd_client_t *)rq->client; @@ -407,7 +407,7 @@ void *resolve_request(void *data) char path[2 * BUFFLEN]; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); char *url = (char *)dvalue(rq->request, "RESOURCE_PATH"); char *newurl = NULL; char *rqp = NULL; @@ -613,7 +613,7 @@ void *serve_file(void *data) { antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); char *path = (char *)dvalue(rq->request, "ABS_RESOURCE_PATH"); char *mime_type = (char *)dvalue(rq->request, "RESOURCE_MIME"); rq->client->state = ANTD_CLIENT_SERVE_FILE; @@ -752,6 +752,174 @@ char *apply_rules(dictionary_t rules, const char *host, char *url) return strdup(query_string); } +static void *proxy_monitor(void *data) +{ + antd_request_t *rq = (antd_request_t *)data; + antd_client_t *proxy = (antd_client_t *)dvalue(rq->request, "PROXY_HANDLE"); + antd_task_t* task = antd_create_task(NULL, data, NULL, rq->client->last_io); + int ret, max_fd; + fd_set read_flags; + // first verify if the socket is ready + FD_ZERO(&read_flags); + FD_SET(rq->client->sock, &read_flags); + FD_SET(proxy->sock, &read_flags); + //FD_ZERO(&write_flags); + //FD_SET(rq->client->sock, &write_flags); + //FD_SET(proxy->sock, &write_flags); + char *buf = NULL; + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 500; + + max_fd = proxy->sock > rq->client->sock ? proxy->sock: rq->client->sock; + + // select + ret = select(max_fd + 1, &read_flags, NULL, (fd_set *)0, &timeout); + if(ret == -1) + { + //antd_error(rq->client, 500, ""); + (void)close(proxy->sock); + return task; + } + if(ret > 0) + { + buf = (char *)malloc(BUFFLEN); + if (FD_ISSET(rq->client->sock, &read_flags)) + { + ret = antd_recv_upto(rq->client, buf, BUFFLEN); + if(ret == -1) + { + free(buf); + (void)close(proxy->sock); + return task; + } + antd_send(proxy, buf, ret); + } + if (FD_ISSET(proxy->sock, &read_flags)) + { + ret = antd_recv_upto(proxy, buf, BUFFLEN); + if(ret == -1) + { + free(buf); + (void)close(proxy->sock); + return task; + } + antd_send(rq->client, buf, ret); + } + free(buf); + } + task->handle = proxy_monitor; + task->access_time = rq->client->last_io; + // register event + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE); + antd_task_bind_event(task, proxy->sock, 0, TASK_EVT_ON_READABLE); + return task; +} +static void *proxify(void *data) +{ + int sock_fd, size, ret; + char *str = NULL; + chain_t it; + antd_request_t *rq = (antd_request_t *)data; + antd_client_t *proxy = NULL; + rq->client->state = ANTD_CLIENT_RESOLVE_REQUEST; + char *host = dvalue(rq->request, "PROXY_HOST"); + int port = atoi(dvalue(rq->request, "PROXY_PORT")); + char *path = dvalue(rq->request, "PROXY_PATH"); + char *query = dvalue(rq->request, "PROXY_QUERY"); + dictionary_t xheader = dvalue(rq->request, "REQUEST_HEADER"); + antd_task_t *task = antd_create_task(NULL, data, NULL, rq->client->last_io); + if (!xheader) + { + antd_error(rq->client, 400, "Badd Request"); + return task; + } + sock_fd = request_socket(ip_from_hostname(host), port); + if (sock_fd == -1) + { + antd_error(rq->client, 503, "Service Unavailable"); + return task; + } + proxy = (antd_client_t *)malloc(sizeof(antd_client_t)); + proxy->sock = sock_fd; + proxy->ssl = NULL; + proxy->zstream = NULL; + proxy->z_level = ANTD_CNONE; + dput(rq->request, "PROXY_HANDLE", proxy); + + str = __s("%s %s?%s HTTP/1.1\r\n", (char *)dvalue(rq->request, "METHOD"), path, query); + size = strlen(str); + ret = antd_send(proxy, str, size); + free(str); + if (ret != size) + { + antd_error(rq->client, 500, ""); + (void)close(sock_fd); + return task; + } + for_each_assoc(it, xheader) + { + str = __s("%s: %s\r\n", it->key, (char *)it->value); + size = strlen(str); + ret = antd_send(proxy, str, size); + free(str); + if (ret != size) + { + antd_error(rq->client, 500, ""); + (void)close(sock_fd); + return task; + } + } + (void)antd_send(proxy, "\r\n", 2); + + // now monitor the proxy + task->handle = proxy_monitor; + task->access_time = rq->client->last_io; + // register event + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE); + antd_task_bind_event(task, proxy->sock, 0, TASK_EVT_ON_READABLE); + return task; +} +/** + * Check if the current request is e reverse proxy + * return a proxy task if this is the case +*/ +static void *check_proxy(antd_request_t *rq, const char *path, const char *query) +{ + char *pattern = "^(https?)://([^:]+):([0-9]+)(.*)$"; + char buff[256]; + regmatch_t matches[5]; + int ret, size; + ret = regex_match(pattern, path, 5, matches); + + if (!ret) + { + return NULL; + } + + if (matches[1].rm_eo - matches[1].rm_so == 5) + { + // https is not supported for now + // TODO add https support + antd_error(rq->client, 503, "Service Unavailable"); + return antd_create_task(NULL, (void *)rq, NULL, time(NULL)); + } + // http proxy request + size = matches[2].rm_eo - matches[2].rm_so < (int)sizeof(buff) ? matches[2].rm_eo - matches[2].rm_so : (int)sizeof(buff); + (void)memcpy(buff, path + matches[2].rm_so, size); + buff[size] = '\0'; + dput(rq->request, "PROXY_HOST", strdup(buff)); + + size = matches[3].rm_eo - matches[3].rm_so < (int)sizeof(buff) ? matches[3].rm_eo - matches[3].rm_so : (int)sizeof(buff); + (void)memcpy(buff, path + matches[3].rm_so, size); + buff[size] = '\0'; + dput(rq->request, "PROXY_PORT", strdup(buff)); + + dput(rq->request, "PROXY_PATH", strdup(path + matches[4].rm_so)); + dput(rq->request, "PROXY_QUERY", strdup(query)); + + return antd_create_task(proxify, (void *)rq, NULL, rq->client->last_io); +} /** * Decode the HTTP request header */ @@ -771,9 +939,9 @@ void *decode_request_header(void *data) char *url = (char *)dvalue(rq->request, "REQUEST_QUERY"); dictionary_t xheader = dvalue(rq->request, "REQUEST_HEADER"); dictionary_t request = dvalue(rq->request, "REQUEST_DATA"); - char *port_s = (char *)dvalue(xheader, "SERVER_PORT"); + char *port_s = (char *)dvalue(rq->request, "SERVER_PORT"); port_config_t *pcnf = (port_config_t *)dvalue(server_config.ports, port_s); - antd_task_t * task; + antd_task_t *task; // first real all header // this for check if web socket is enabled @@ -808,7 +976,7 @@ void *decode_request_header(void *data) antd_error(rq->client, 413, "Payload Too Large"); ERROR("Header size too large (%d): %d vs %d", rq->client->sock, header_size, HEADER_MAX_SIZE); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); return task; } } @@ -824,7 +992,7 @@ void *decode_request_header(void *data) // 100 ms sleep usleep(100000); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); return task; } } @@ -858,19 +1026,28 @@ void *decode_request_header(void *data) LOG("Original query (%d): %s", rq->client->sock, url); query = apply_rules(pcnf->rules, host, buf); LOG("Processed query: %s", query); + if (cookie) + dput(rq->request, "COOKIE", cookie); + if (host) + free(host); + // check if this is a reverse proxy ? + task = check_proxy(rq, buf, query); + if (task) + { + if (query) + free(query); + return task; + } + // otherwise it a normal query dput(rq->request, "RESOURCE_PATH", url_decode(buf)); if (query) { decode_url_request(query, request); free(query); } - if (cookie) - dput(rq->request, "COOKIE", cookie); - if (host) - free(host); // header ok, now checkmethod task = antd_create_task(decode_request, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); return task; } @@ -889,7 +1066,7 @@ void *decode_request(void *data) ws = 1; method = (char *)dvalue(rq->request, "METHOD"); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); if (strcmp(method, "GET") == 0 || strcmp(method, "HEAD") == 0 || strcmp(method, "OPTIONS") == 0) { //if(ctype) free(ctype); @@ -932,7 +1109,7 @@ void *decode_post_request(void *data) clen = atoi(tmp); char *method = (char *)dvalue(rq->request, "METHOD"); task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); if (!method || strcmp(method, "POST") != 0) return task; if (ctype == NULL || clen == -1) @@ -980,7 +1157,7 @@ void ws_confirm_request(void *client, const char *key) char rkey[128]; char sha_d[20]; char base64[64]; - strncpy(rkey, key, sizeof(rkey)-1); + strncpy(rkey, key, sizeof(rkey) - 1); int n = (int)sizeof(rkey) - (int)strlen(key); if (n < 0) n = 0; @@ -1048,7 +1225,7 @@ void *decode_multi_part_request(void *data, const char *ctype) int len; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); //dictionary dic = NULL; boundary = strsep(&str_copy, "="); //discard first part boundary = str_copy; @@ -1082,7 +1259,7 @@ void *decode_multi_part_request_data(void *data) char *token, *keytoken, *valtoken; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); char *boundary = (char *)dvalue(rq->request, "MULTI_PART_BOUNDARY"); dictionary_t dic = (dictionary_t)dvalue(rq->request, "REQUEST_DATA"); // search for content disposition: @@ -1303,7 +1480,7 @@ void *execute_plugin(void *data, const char *pname) char *error; antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); - antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); //LOG("Plugin name '%s'", pname); rq->client->state = ANTD_CLIENT_PLUGIN_EXEC; //load the plugin @@ -1341,7 +1518,7 @@ void *execute_plugin(void *data, const char *pname) { free(task); task = antd_create_task(decode_post_request, (void *)rq, fn, rq->client->last_io); - antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE); } return task; } diff --git a/httpd.c b/httpd.c index 9b4a1d5..e487ee6 100644 --- a/httpd.c +++ b/httpd.c @@ -190,8 +190,8 @@ static void antd_monitor(port_config_t *pcnf) dictionary_t xheader = dict(); dput(request->request, "REQUEST_HEADER", xheader); dput(request->request, "REQUEST_DATA", dict()); - dput(xheader, "SERVER_PORT", (void *)__s("%d", pcnf->port)); - dput(xheader, "SERVER_WWW_ROOT", (void *)strdup(pcnf->htdocs)); + dput(request->request, "SERVER_PORT", (void *)__s("%d", pcnf->port)); + dput(request->request, "SERVER_WWW_ROOT", (void *)strdup(pcnf->htdocs)); /* get the remote IP */ @@ -200,7 +200,7 @@ static void antd_monitor(port_config_t *pcnf) client_ip = inet_ntoa(client_name.sin_addr); LOG("Connect to client IP: %s on port:%d (%d)", client_ip, pcnf->port, client_sock); // ip address - dput(xheader, "REMOTE_ADDR", (void *)strdup(client_ip)); + dput(request->request, "REMOTE_ADDR", (void *)strdup(client_ip)); //LOG("socket: %d\n", client_sock); } @@ -304,8 +304,10 @@ int antd_task_data_id(void *data) { antd_request_t *rq = (antd_request_t *)data; if(!rq) - return 0; + return 0; return antd_scheduler_next_id(scheduler,rq->client->sock); + //UNUSED(data); + //return antd_scheduler_next_id(scheduler,0); } int main(int argc, char *argv[]) diff --git a/lib/dictionary.h b/lib/dictionary.h index 9f65b92..c1d23ec 100644 --- a/lib/dictionary.h +++ b/lib/dictionary.h @@ -24,35 +24,37 @@ THE SOFTWARE. #ifndef DICTIONARY_H #define DICTIONARY_H -#define DHASHSIZE 16 -#define for_each_assoc(assoc, dic) \ - for(unsigned int i = 0; i < dic->cap; i++) \ - for(assoc = dic->map[i];assoc!= NULL; assoc = assoc->next) +#define DHASHSIZE 16 +#define for_each_assoc(assoc, dic) \ + for (unsigned int i = 0; i < dic->cap; i++) \ + for (assoc = dic->map[i]; assoc != NULL; assoc = assoc->next) /** * Dictionary for header */ -typedef struct __assoc{ - struct __assoc *next; - char *key; - void* value; +typedef struct __assoc +{ + struct __assoc *next; + char *key; + void *value; //char *value; } * chain_t; -typedef chain_t* map_t; +typedef chain_t *map_t; -typedef struct __dict{ +typedef struct __dict +{ unsigned int cap; map_t map; unsigned int size; -}* dictionary_t; +} * dictionary_t; dictionary_t dict(); dictionary_t dict_n(unsigned int n); -chain_t dlookup(dictionary_t,const char*); -void* dvalue(dictionary_t, const char*); -chain_t dput(dictionary_t,const char*, void*); -chain_t dremove(dictionary_t, const char*); +chain_t dlookup(dictionary_t, const char *); +void *dvalue(dictionary_t, const char *); +chain_t dput(dictionary_t, const char *, void *); +chain_t dremove(dictionary_t, const char *); void freedict(dictionary_t); #endif \ No newline at end of file diff --git a/lib/handle.c b/lib/handle.c index 446f507..1ebb29c 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -107,8 +107,8 @@ int compressable(char *ctype) void htdocs(antd_request_t *rq, char *dest) { - dictionary_t xheader = (dictionary_t)dvalue(rq->request, "REQUEST_HEADER"); - char *www = (char *)dvalue(xheader, "SERVER_WWW_ROOT"); + //dictionary_t xheader = (dictionary_t)dvalue(rq->request, "REQUEST_HEADER"); + char *www = (char *)dvalue(rq->request, "SERVER_WWW_ROOT"); if (www) { strcpy(dest, www); @@ -527,7 +527,7 @@ int antd_send(void *src, const void *data_in, int len_in) writelen = (len - written) > BUFFLEN ? BUFFLEN : (len - written); time(&source->last_io); } - else if (difftime(time(NULL), source->last_io) > MAX_IO_WAIT_TIME || (count == -1 && errno != EAGAIN && errno != EWOULDBLOCK)) + else if ((difftime(time(NULL), source->last_io) > MAX_IO_WAIT_TIME) || (count == -1 && errno != EAGAIN && errno != EWOULDBLOCK)) { if (written == 0) written = count; @@ -542,6 +542,76 @@ int antd_send(void *src, const void *data_in, int len_in) return written; } +/** + * Read up to n bytes, not guaranty to have exactly nbytes + * - return -1 if false + * */ +int antd_recv_upto(void *src, void *data, int len) +{ + if (!src) + return -1; + int received = 0; + antd_client_t *source = (antd_client_t *)src; +#ifdef USE_OPENSSL + if (source->ssl) + { + ERR_clear_error(); + received = SSL_read(source->ssl, data, len); + int err = SSL_get_error(source->ssl, received); + if (received > 0) + { + time(&source->last_io); + return received; + } + else + { + switch (err) + { + case SSL_ERROR_NONE: + { + return 0; + } + + case SSL_ERROR_ZERO_RETURN: + { + // peer disconnected... + return -1; + } + + case SSL_ERROR_WANT_READ: + { + return 0; + } + + case SSL_ERROR_WANT_WRITE: + { + return 0; + } + + default: + return -1; + } + } + } + else + { +#endif + received = recv(((int)source->sock), data, len, 0); + //LOG("Read : %c\n", *ptr); + if (received > 0) + { + time(&source->last_io); + return received; + } + if (received == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) + { + return -1; + } + return 0; +#ifdef USE_OPENSSL + } +#endif +} int antd_recv(void *src, void *data, int len) { if (!src) @@ -688,7 +758,8 @@ int antd_recv(void *src, void *data, int len) time(&source->last_io); //LOG("Read len is %d\n", readlen); } - else if (difftime(time(NULL), source->last_io) > MAX_IO_WAIT_TIME || (errno != EAGAIN && errno != EWOULDBLOCK)) + else if ((read == 0) || + difftime(time(NULL), source->last_io) > MAX_IO_WAIT_TIME || (errno != EAGAIN && errno != EWOULDBLOCK)) { //ERROR("Error while reading: %s", strerror(errno)); if (read == 0) diff --git a/lib/handle.h b/lib/handle.h index 5af3ed2..2dcd3a1 100644 --- a/lib/handle.h +++ b/lib/handle.h @@ -132,6 +132,7 @@ int ws_enable(dictionary_t); int read_buf(void *sock, char *buf, int i); int antd_send(void *source, const void *data, int len); int antd_recv(void *source, void *data, int len); +int antd_recv_upto(void* src, void* data, int len); int antd_close(void *source); void destroy_request(void *data); #endif diff --git a/lib/scheduler.c b/lib/scheduler.c index 5a98b72..a2b8c6e 100644 --- a/lib/scheduler.c +++ b/lib/scheduler.c @@ -694,7 +694,7 @@ void *antd_scheduler_wait(void *ptr) antd_queue_item_t it = NULL; antd_queue_item_t curr = NULL; antd_task_evt_item_t *eit = NULL; - bst_node_t* node = NULL; + bst_node_t* node, *task_node = NULL; struct pollfd *pfds = NULL; antd_scheduler_t *scheduler = (antd_scheduler_t *)ptr; @@ -745,6 +745,7 @@ void *antd_scheduler_wait(void *ptr) for (int i = 0; i < pollsize; i++) { // find the event + task_node = NULL; node = bst_find(poll_list,i); if(node) eit = (antd_task_evt_item_t *)node->data; @@ -755,9 +756,12 @@ void *antd_scheduler_wait(void *ptr) ) { // event triggered schedule the task pthread_mutex_lock(&scheduler->scheduler_lock); - scheduler->task_queue = bst_delete(scheduler->task_queue, eit->task->id); + task_node = bst_find(scheduler->task_queue, eit->task->id); + if(task_node) + scheduler->task_queue = bst_delete(scheduler->task_queue, eit->task->id); pthread_mutex_unlock(&scheduler->scheduler_lock); - antd_task_schedule(scheduler, eit->task); + if(task_node) + antd_task_schedule(scheduler, eit->task); } else if( (pfds[i].revents & POLLERR) || (pfds[i].revents & POLLHUP) ) { // task is no longer available @@ -782,6 +786,8 @@ void *antd_scheduler_wait(void *ptr) if (!scheduler->task_queue) { + // reset id allocator + //scheduler->id_allocator=0; // no task found, go to idle state sem_wait(scheduler->scheduler_sem); } @@ -805,6 +811,7 @@ int antd_scheduler_next_id(antd_scheduler_t *sched, int input) while (bst_find(sched->task_queue, id) != NULL) { + sched->id_allocator++; id = sched->id_allocator; } pthread_mutex_unlock(&sched->scheduler_lock); diff --git a/lib/utils.c b/lib/utils.c index 8770d2f..dcf4599 100644 --- a/lib/utils.c +++ b/lib/utils.c @@ -34,6 +34,8 @@ THE SOFTWARE. #include #include #include +#include +#include //hostent #ifdef USE_OPENSSL #include @@ -594,4 +596,60 @@ int guard_write(int fd, void* buffer, size_t size) n += st; } return n; +} + +/* +send a request +*/ +int request_socket(const char *ip, int port) +{ + int sockfd; + struct sockaddr_in dest; + + if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) + { + ERROR("Socket: %s", strerror(errno)); + return -1; + } + /*struct linger lingerStruct; + lingerStruct.l_onoff = 0; // turn lingering off for sockets + setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &lingerStruct, sizeof(lingerStruct));*/ + + bzero(&dest, sizeof(dest)); + dest.sin_family = AF_INET; + dest.sin_port = htons(port); + if (inet_aton(ip, &dest.sin_addr) == 0) + { + perror(ip); + close(sockfd); + return -1; + } + if (connect(sockfd, (struct sockaddr *)&dest, sizeof(dest)) != 0) + { + close(sockfd); + ERROR("Connect:%s", strerror(errno)); + return -1; + } + return sockfd; +} + +char* ip_from_hostname(const char *hostname) +{ + struct hostent *he; + struct in_addr **addr_list; + int i; + if ((he = gethostbyname(hostname)) == NULL) + { + // get the host info + ERROR("gethostbyname:%s", strerror(errno)); + return NULL; + } + addr_list = (struct in_addr **)he->h_addr_list; + + for (i = 0; addr_list[i] != NULL; i++) + { + //Return the first one; + return inet_ntoa(*addr_list[i]); + } + return NULL; } \ No newline at end of file diff --git a/lib/utils.h b/lib/utils.h index de97369..b84adc4 100644 --- a/lib/utils.h +++ b/lib/utils.h @@ -96,4 +96,6 @@ void digest_to_hex(const uint8_t *, char *); void verify_header(char* k); int guard_read(int fd, void* buffer, size_t size); int guard_write(int fd, void* buffer, size_t size); +int request_socket(const char *ip, int port); +char* ip_from_hostname(const char *hostname); #endif diff --git a/lib/ws.c b/lib/ws.c index 40cec80..fa8459a 100644 --- a/lib/ws.c +++ b/lib/ws.c @@ -3,7 +3,6 @@ #include #include #include -#include //hostent #ifdef USE_OPENSSL #include #include @@ -348,71 +347,6 @@ int ws_send_close(void *client, unsigned int status, int mask) //_send_header(client, header); //send(client,bytes,2,0); } -int ip_from_hostname(const char *hostname, char *ip) -{ - struct hostent *he; - struct in_addr **addr_list; - int i; - if ((he = gethostbyname(hostname)) == NULL) - { - // get the host info - ERROR("gethostbyname:%s", strerror(errno)); - return -1; - } - addr_list = (struct in_addr **)he->h_addr_list; - - for (i = 0; addr_list[i] != NULL; i++) - { - //Return the first one; - strcpy(ip, inet_ntoa(*addr_list[i])); - return 0; - } - return -1; -} - -/* -send a request -*/ -int request_socket(const char *ip, int port) -{ - int sockfd; - struct sockaddr_in dest; - - // time out setting - struct timeval timeout; - timeout.tv_sec = CONN_TIME_OUT_S; - timeout.tv_usec = 0; //3 s - if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) - { - ERROR("Socket: %s", strerror(errno)); - return -1; - } - if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) - ERROR("setsockopt failed:%s", strerror(errno)); - - if (setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) - ERROR("setsockopt failed:%s", strerror(errno)); - /*struct linger lingerStruct; - lingerStruct.l_onoff = 0; // turn lingering off for sockets - setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &lingerStruct, sizeof(lingerStruct));*/ - - bzero(&dest, sizeof(dest)); - dest.sin_family = AF_INET; - dest.sin_port = htons(port); - if (inet_aton(ip, &dest.sin_addr) == 0) - { - perror(ip); - close(sockfd); - return -1; - } - if (connect(sockfd, (struct sockaddr *)&dest, sizeof(dest)) != 0) - { - close(sockfd); - ERROR("Connect:%s", strerror(errno)); - return -1; - } - return sockfd; -} void ws_client_close(ws_client_t *wsclient) { @@ -437,9 +371,8 @@ void ws_client_close(ws_client_t *wsclient) //this is for the client side, not use for now int ws_client_connect(ws_client_t *wsclient, port_config_t pcnf) { - char ip[100]; - int stat = ip_from_hostname(wsclient->host, ip); - if (stat == -1) + char* ip = ip_from_hostname(wsclient->host); + if (ip == NULL) return -1; int sock = request_socket(ip, pcnf.port); if (sock <= 0) @@ -447,6 +380,16 @@ int ws_client_connect(ws_client_t *wsclient, port_config_t pcnf) ERROR("Cannot request socket"); return -1; } + // time out setting + struct timeval timeout; + timeout.tv_sec = CONN_TIME_OUT_S; + timeout.tv_usec = 0; //3 s + if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) + ERROR("setsockopt failed:%s", strerror(errno)); + + if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) + ERROR("setsockopt failed:%s", strerror(errno)); + // will be free wsclient->antdsock->sock = sock; wsclient->antdsock->z_status = 0; diff --git a/lib/ws.h b/lib/ws.h index 2e814e4..7fff10f 100644 --- a/lib/ws.h +++ b/lib/ws.h @@ -57,8 +57,6 @@ int ws_send_file(void *client, const char *file, int mask); int ws_send_binary(void *client, uint8_t *data, int l, int mask); int ws_read_data(void *, ws_msg_header_t *, int, uint8_t *); -int request_socket(const char *ip, int port); -int ip_from_hostname(const char *hostname, char *ip); //int ws_open_hand_shake(const char* host, int port, const char* resource); char *get_ip_address();