diff --git a/dist/antd-1.0.6b.tar.gz b/dist/antd-1.0.6b.tar.gz index 5816229..aee183c 100644 Binary files a/dist/antd-1.0.6b.tar.gz and b/dist/antd-1.0.6b.tar.gz differ diff --git a/http_server.c b/http_server.c index f28b544..5a884f4 100644 --- a/http_server.c +++ b/http_server.c @@ -758,60 +758,66 @@ static void *proxy_monitor(void *data) antd_client_t *proxy = (antd_client_t *)dvalue(rq->request, "PROXY_HANDLE"); antd_task_t* task = antd_create_task(NULL, data, NULL, rq->client->last_io); int ret, max_fd; - fd_set read_flags; + fd_set read_flags, write_flags; // first verify if the socket is ready FD_ZERO(&read_flags); FD_SET(rq->client->sock, &read_flags); FD_SET(proxy->sock, &read_flags); - //FD_ZERO(&write_flags); - //FD_SET(rq->client->sock, &write_flags); + FD_ZERO(&write_flags); + FD_SET(rq->client->sock, &write_flags); //FD_SET(proxy->sock, &write_flags); char *buf = NULL; struct timeval timeout; timeout.tv_sec = 0; - timeout.tv_usec = 500; + timeout.tv_usec = 5000; max_fd = proxy->sock > rq->client->sock ? proxy->sock: rq->client->sock; - + buf = (char *)malloc(BUFFLEN); + //printf("start proxy monitor\n"); // select - ret = select(max_fd + 1, &read_flags, NULL, (fd_set *)0, &timeout); + do + { + ret = select(max_fd + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); + if(ret > 0) + { + + memset(buf, '\0', BUFFLEN); + if (FD_ISSET(rq->client->sock, &read_flags) || FD_ISSET(rq->client->sock, &write_flags)) + { + ret = antd_recv_upto(rq->client, buf, BUFFLEN); + if(ret == -1) + { + free(buf); + (void)close(proxy->sock); + return task; + } + antd_send(proxy, buf, ret); + } + if (FD_ISSET(proxy->sock, &read_flags)) + { + ret = antd_recv_upto(proxy, buf, BUFFLEN); + if(ret == -1) + { + free(buf); + (void)close(proxy->sock); + return task; + } + antd_send(rq->client, buf, ret); + } + } + } while (ret > 0); + free(buf); + //printf("monitor return: %d\n", ret); if(ret == -1) { - //antd_error(rq->client, 500, ""); (void)close(proxy->sock); return task; } - if(ret > 0) - { - buf = (char *)malloc(BUFFLEN); - if (FD_ISSET(rq->client->sock, &read_flags)) - { - ret = antd_recv_upto(rq->client, buf, BUFFLEN); - if(ret == -1) - { - free(buf); - (void)close(proxy->sock); - return task; - } - antd_send(proxy, buf, ret); - } - if (FD_ISSET(proxy->sock, &read_flags)) - { - ret = antd_recv_upto(proxy, buf, BUFFLEN); - if(ret == -1) - { - free(buf); - (void)close(proxy->sock); - return task; - } - antd_send(rq->client, buf, ret); - } - free(buf); - } + task->handle = proxy_monitor; task->access_time = rq->client->last_io; // register event - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE | TASK_EVT_ON_WRITABLE); antd_task_bind_event(task, proxy->sock, 0, TASK_EVT_ON_READABLE); return task; } @@ -840,6 +846,7 @@ static void *proxify(void *data) antd_error(rq->client, 503, "Service Unavailable"); return task; } + set_nonblock(sock_fd); proxy = (antd_client_t *)malloc(sizeof(antd_client_t)); proxy->sock = sock_fd; proxy->ssl = NULL; @@ -871,12 +878,11 @@ static void *proxify(void *data) } } (void)antd_send(proxy, "\r\n", 2); - // now monitor the proxy task->handle = proxy_monitor; task->access_time = rq->client->last_io; // register event - antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE); + antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_READABLE | TASK_EVT_ON_WRITABLE); antd_task_bind_event(task, proxy->sock, 0, TASK_EVT_ON_READABLE); return task; } @@ -1121,8 +1127,15 @@ void *decode_post_request(void *data) if (strstr(ctype, FORM_URL_ENCODE)) { char *pquery = post_data_decode(rq->client, clen); - decode_url_request(pquery, request); - free(pquery); + if(pquery) + { + decode_url_request(pquery, request); + free(pquery); + } + else if(clen > 0) + { + task->handle = decode_post_request; + } } else if (strstr(ctype, FORM_MULTI_PART)) { @@ -1132,6 +1145,7 @@ void *decode_post_request(void *data) else { char *pquery = post_data_decode(rq->client, clen); + //printf("POST: %s\n", pquery); char *key = strstr(ctype, "/"); if (key) key++; @@ -1142,6 +1156,10 @@ void *decode_post_request(void *data) dput(request, key, strdup(pquery)); free(pquery); } + else if(clen > 0) + { + task->handle = decode_post_request; + } } return task; } @@ -1439,7 +1457,7 @@ char *post_data_decode(void *client, int len) int read = 0, stat = 1; while (readlen > 0 && stat > 0) { - stat = antd_recv(client, ptr + read, readlen); + stat = antd_recv_upto(client, ptr + read, readlen); if (stat > 0) { read += stat; diff --git a/httpd.c b/httpd.c index e487ee6..59aeb7f 100644 --- a/httpd.c +++ b/httpd.c @@ -302,12 +302,12 @@ void antd_scheduler_destroy_data(void *data) int antd_task_data_id(void *data) { - antd_request_t *rq = (antd_request_t *)data; + /*antd_request_t *rq = (antd_request_t *)data; if(!rq) return 0; - return antd_scheduler_next_id(scheduler,rq->client->sock); - //UNUSED(data); - //return antd_scheduler_next_id(scheduler,0); + return antd_scheduler_next_id(scheduler,rq->client->sock);*/ + UNUSED(data); + return antd_scheduler_next_id(scheduler,0); } int main(int argc, char *argv[]) diff --git a/lib/scheduler.c b/lib/scheduler.c index a2b8c6e..1796ef2 100644 --- a/lib/scheduler.c +++ b/lib/scheduler.c @@ -645,7 +645,6 @@ static void task_event_collect(bst_node_t* node, void** argv, int argc) antd_queue_t* exec_list = (antd_queue_t*) argv[0]; bst_node_t** poll_list = (bst_node_t**) argv[1]; int* pollsize = (int*) argv[2]; - if(!task->events) { enqueue(exec_list, task); @@ -805,7 +804,7 @@ int antd_scheduler_next_id(antd_scheduler_t *sched, int input) pthread_mutex_lock(&sched->scheduler_lock); if (id == 0) { - sched->id_allocator++; + sched->id_allocator++; id = sched->id_allocator; }