diff --git a/Makefile.am b/Makefile.am index ad67d06..2dbe5e5 100644 --- a/Makefile.am +++ b/Makefile.am @@ -21,7 +21,8 @@ libantd_la_SOURCES = lib/ini.c \ lib/ws.c \ lib/sha1.c \ lib/list.c \ - lib/scheduler.c + lib/scheduler.c \ + lib/h2.c pkginclude_HEADERS = lib/ini.h \ lib/handle.h \ @@ -32,7 +33,8 @@ pkginclude_HEADERS = lib/ini.h \ lib/sha1.h \ lib/list.h \ lib/scheduler.h \ - lib/plugin.h + lib/plugin.h \ + lib/h2.h EXTRA_DIST = plugin_manager.h http_server.h README.md LICENSE antd-config.ini diff --git a/http_server.c b/http_server.c index 924f4f6..038cb5f 100644 --- a/http_server.c +++ b/http_server.c @@ -1,5 +1,5 @@ #include "http_server.h" - +#include "lib/h2.h" //define all basic mime here static mime_t _mimes[] = { {"image/bmp","bmp"}, @@ -229,7 +229,7 @@ void load_config(const char *file) // put it default mimes for(int i = 0; _mimes[i].type != NULL; i++) { - dput(server_config.mimes,_mimes[i].type, strdup(_mimes[i].ext)); + dput_static(server_config.mimes,_mimes[i].type, (void*)_mimes[i].ext); } if (ini_parse(file, config_handler, &server_config) < 0) { @@ -254,9 +254,6 @@ void load_config(const char *file) void *accept_request(void *data) { - char buf[BUFFLEN]; - char *token = NULL; - char *line = NULL; antd_task_t *task; antd_request_t *rq = (antd_request_t *)data; @@ -281,22 +278,13 @@ void *accept_request(void *data) } if (sel == 0 || (!FD_ISSET(client->sock, &read_flags) && !FD_ISSET(client->sock, &write_flags))) { - /*if(client->last_wait == 0) client->last_wait = time(NULL); - // retry it later - if(time(NULL) - client->last_wait > MAX_WAIT_S) - { - LOG("Read and write timeout, give up on %d\n", client->sock); - server_config.connection++; - unknow(rq->client); - return task; - }*/ task->handle = accept_request; return task; } // perform the ssl handshake if enabled #ifdef USE_OPENSSL int ret = -1, stat; - if (client->ssl && client->status == 0) + if (client->ssl && !(client->flags & CLIENT_FL_ACCEPTED) ) { //LOG("Atttempt %d\n", client->attempt); if (SSL_accept((SSL *)client->ssl) == -1) @@ -307,27 +295,17 @@ void *accept_request(void *data) case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: case SSL_ERROR_NONE: - //LOG("RETRY SSL %d\n", client->sock); - /*if(client->last_wait == 0) client->last_wait = time(NULL); - if(time(NULL) - client->last_wait > MAX_WAIT_S) - { - server_config.connection++; - unknow(rq->client); - LOG("SSL timeout, give up on %d\n", client->sock); - return task; - } - task->status = TASK_ACCEPT_SSL_CONT;*/ task->handle = accept_request; return task; default: ERROR("Error performing SSL handshake %d %d %s", stat, ret, ERR_error_string(ERR_get_error(), NULL)); antd_error(rq->client, 400, "Invalid SSL request"); - //server_config.connection++; + ERR_print_errors_fp(stderr); return task; } } - client->status = 1; + client->flags |= CLIENT_FL_ACCEPTED; task->handle = accept_request; //LOG("Handshake finish for %d\n", client->sock); return task; @@ -336,54 +314,22 @@ void *accept_request(void *data) { if (!FD_ISSET(client->sock, &read_flags)) { - /*if(client->last_wait == 0) client->last_wait = time(NULL); - if(time(NULL) - client->last_wait > MAX_WAIT_S) - { - server_config.connection++; - unknow(rq->client); - LOG("Read timeout, give up on %d\n", client->sock); - return task; - }*/ + client->flags |= CLIENT_FL_ACCEPTED; task->handle = accept_request; return task; } } #endif - //LOG("Ready for reading %d\n", client->sock); - //server_config.connection++; - read_buf(rq->client, buf, sizeof(buf)); - line = buf; - // get the method string - token = strsep(&line, " "); - if (!line) + //printf("Flag: %d\n", client->flags); + // now return the task base on the http version + if(client->flags & CLIENT_FL_HTTP_1_1) { - //LOG("No method found"); - antd_error(rq->client, 405, "No method found"); - return task; + task->handle = decode_request_header; } - trim(token, ' '); - trim(line, ' '); - dput(rq->request, "METHOD", strdup(token)); - // get the request - token = strsep(&line, " "); - if (!line) + else { - //LOG("No request found"); - antd_error(rq->client, 400, "Bad request"); - return task; + task->handle = antd_h2_preface_ck; } - trim(token, ' '); - trim(line, ' '); - trim(line, '\n'); - trim(line, '\r'); - dput(rq->request, "PROTOCOL", strdup(line)); - dput(rq->request, "REQUEST_QUERY", strdup(token)); - line = token; - token = strsep(&line, "?"); - dput(rq->request, "REQUEST_PATH", url_decode(token)); - // decode request - // now return the task - task->handle = decode_request_header; return task; } @@ -460,7 +406,7 @@ void *resolve_request(void *data) // find an handler plugin to process it // if the plugin is not found, forbidden access to the file should be sent char *mime_type = mime(path); - dput(rq->request, "RESOURCE_MIME", strdup(mime_type)); + dput_static(rq->request, "RESOURCE_MIME", mime_type); if (strcmp(mime_type, "application/octet-stream") == 0) { char *ex = ext(path); @@ -639,7 +585,7 @@ void *serve_file(void *data) rhd.cookie = NULL; rhd.status = 200; rhd.header = dict(); - dput(rhd.header, "Content-Type", strdup(mime_type)); + dput_static(rhd.header, "Content-Type", mime_type); #ifdef USE_ZLIB if(!compressable(mime_type) || rq->client->z_level == ANTD_CNONE) #endif @@ -647,7 +593,7 @@ void *serve_file(void *data) gmtime_r(&st.st_ctime, &tm); strftime(ibuf, 255, "%a, %d %b %Y %H:%M:%S GMT", &tm); dput(rhd.header, "Last-Modified", strdup(ibuf)); - dput(rhd.header, "Cache-Control", strdup("no-cache")); + dput_static(rhd.header, "Cache-Control", "no-cache"); antd_send_header(rq->client, &rhd); __f(rq->client, path); @@ -745,12 +691,48 @@ void *decode_request_header(void *data) char *query = NULL; char *host = NULL; char buf[2 * BUFFLEN]; + // read the first line + + //server_config.connection++; + read_buf(rq->client, buf, sizeof(buf)); + line = buf; + trim(line, '\n'); + trim(line, '\r'); + // get the method string + token = strsep(&line, " "); + if (!line) + { + //LOG("No method found"); + antd_error(rq->client, 405, "No method found"); + return antd_create_task(NULL, (void *)rq, NULL,rq->client->last_io); + } + trim(token, ' '); + trim(line, ' '); + dput(rq->request, "METHOD", strdup(token)); + // get the request + token = strsep(&line, " "); + if (!line) + { + //LOG("No request found"); + antd_error(rq->client, 400, "Bad request"); + return antd_create_task(NULL, (void *)rq, NULL,rq->client->last_io); + } + trim(token, ' '); + trim(line, ' '); + trim(line, '\n'); + trim(line, '\r'); + dput(rq->request, "PROTOCOL", strdup(line)); + dput(rq->request, "REQUEST_QUERY", strdup(token)); + line = token; + token = strsep(&line, "?"); + dput(rq->request, "REQUEST_PATH", url_decode(token)); + char *url = (char *)dvalue(rq->request, "REQUEST_QUERY"); dictionary_t xheader = dvalue(rq->request, "REQUEST_HEADER"); dictionary_t request = dvalue(rq->request, "REQUEST_DATA"); char* port_s = (char*) dvalue(xheader, "SERVER_PORT"); port_config_t* pcnf = (port_config_t*)dvalue(server_config.ports, port_s); - // first real all header + // this for check if web socket is enabled while ((read_buf(rq->client, buf, sizeof(buf))) && strcmp("\r\n", buf)) { @@ -864,7 +846,7 @@ void *decode_request(void *data) // insert wsocket flag to request // plugin should handle this ugraded connection // not the server - dput(rq->request, "__web_socket__", strdup("1")); + dput_static(rq->request, "__web_socket__", "1"); } // resolve task task->handle = resolve_request; @@ -929,8 +911,8 @@ void *decode_post_request(void *data) key = ctype; if(pquery) { - dput(request, key, strdup(pquery)); - free(pquery); + dput(request, key, pquery); + //free(pquery); } } return task; diff --git a/http_server.h b/http_server.h index c5c97a4..183dfa9 100644 --- a/http_server.h +++ b/http_server.h @@ -24,9 +24,7 @@ void destroy_config(); void load_config(const char* file); void* accept_request(void*); void* finish_request(void*); -void cat(void*, FILE *); -void cannot_execute(void*); -//int get_line(int, char *, int); + void* serve_file(void*); int startup(unsigned *); int rule_check(const char*, const char*, const char* , const char* , const char* , char*); @@ -41,7 +39,6 @@ void* decode_multi_part_request(void*,const char*); void* decode_multi_part_request_data(void* data); void decode_cookie(const char*, dictionary_t d); char* post_data_decode(void*,int); -void set_nonblock(int); void* execute_plugin(void* data, const char *path); #endif \ No newline at end of file diff --git a/httpd.c b/httpd.c index c142f59..823b10d 100644 --- a/httpd.c +++ b/httpd.c @@ -2,6 +2,7 @@ #include #include "http_server.h" #include "lib/ini.h" +#define MAX_VALIDITY_INTERVAL 20 static antd_scheduler_t scheduler; @@ -42,7 +43,8 @@ SSL_CTX *create_context() } #if OPENSSL_VERSION_NUMBER >= 0x10002000L static unsigned char antd_protocols[] = { - //TODO: add support to HTTP/2 protocol: 2,'h', '2', + //TODO: add support to HTTP/2 protocol: + 2,'h', '2', 8, 'h', 't', 't', 'p', '/', '1', '.', '1' }; static int alpn_advertise_protos_cb(SSL *ssl, const unsigned char **out, unsigned int *outlen,void *arg) @@ -55,10 +57,28 @@ static int alpn_advertise_protos_cb(SSL *ssl, const unsigned char **out, unsigne } static int alpn_select_cb(SSL *ssl, const unsigned char **out, unsigned char *outlen, const unsigned char *in, unsigned int inlen, void *arg) { - UNUSED(ssl); UNUSED(arg); + char buf[64]; if(SSL_select_next_proto((unsigned char **)out, outlen,antd_protocols,sizeof(antd_protocols),in, inlen) == OPENSSL_NPN_NEGOTIATED) { + // set client flag to indicate protocol + int sock = SSL_get_fd(ssl); + if(sock <= 0) + { + return SSL_TLSEXT_ERR_ALERT_FATAL; + } + + antd_client_t* client = SSL_get_ex_data(ssl, sock); + if(!client) + { + return SSL_TLSEXT_ERR_ALERT_FATAL; + } + memcpy(buf,*out,*outlen); + buf[*outlen] = '\0'; + if(strcmp(buf,"http/1.1") !=0 ) + { + client->flags &= ~CLIENT_FL_HTTP_1_1; + } return SSL_TLSEXT_ERR_OK; } else @@ -120,6 +140,11 @@ void configure_context(SSL_CTX *ctx) #endif +void schedule_task(antd_task_t* task) +{ + antd_add_task(&scheduler, task); +} + void stop_serve(int dummy) { UNUSED(dummy); @@ -150,6 +175,52 @@ void stop_serve(int dummy) { sigprocmask(SIG_UNBLOCK, &mask, NULL); } + +static int validate_data(antd_task_t* task) +{ + if(difftime( time(NULL), task->access_time) > MAX_VALIDITY_INTERVAL) + return 0; + return 1; +} + +static int is_task_ready(antd_task_t* task) +{ + antd_request_t* rq = (antd_request_t*)task->data; + if(!rq) return 0; + // check if data is ready for read/write + fd_set read_flags, write_flags; + struct timeval timeout; + FD_ZERO(&read_flags); + FD_SET(rq->client->sock, &read_flags); + FD_ZERO(&write_flags); + FD_SET(rq->client->sock, &write_flags); + timeout.tv_sec = 0; + timeout.tv_usec = 0; + int sel = select(rq->client->sock + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); + if(sel > 0 && (FD_ISSET(rq->client->sock, &read_flags)|| FD_ISSET(rq->client->sock, &write_flags))) + { + if(FD_ISSET(rq->client->sock, &read_flags)) + { + rq->client->flags |= CLIENT_FL_READABLE; + } + else + { + rq->client->flags &= ~CLIENT_FL_READABLE; + } + + if(FD_ISSET(rq->client->sock, &write_flags)) + { + rq->client->flags |= CLIENT_FL_WRITABLE; + } + else + { + rq->client->flags &= ~CLIENT_FL_WRITABLE; + } + return 1; + } + return 0; +} + int main(int argc, char* argv[]) { // load the config first @@ -212,15 +283,16 @@ int main(int argc, char* argv[]) } // default to 4 workers antd_scheduler_init(&scheduler, conf->n_workers); - scheduler.validate_data = 1; + scheduler.validate_data = validate_data; scheduler.destroy_data = finish_request; - // use blocking server_sock + scheduler.task_ready = is_task_ready; + + // make the scheduler wait for event on another thread // this allow to ged rid of high cpu usage on // endless loop without doing anything - // set_nonblock(server_sock); pthread_t scheduler_th; - if (pthread_create(&scheduler_th, NULL,(void *(*)(void *))antd_wait, (void*)&scheduler) != 0) + if (pthread_create(&scheduler_th, NULL,(void *(*)(void *))antd_scheduler_wait, (void*)&scheduler) != 0) { ERROR("pthread_create: cannot create worker"); stop_serve(0); @@ -242,7 +314,6 @@ int main(int argc, char* argv[]) { if(conf->connection > conf->maxcon) { - //ERROR("Reach max connection %d", conf->connection); timeout.tv_sec = 0; timeout.tv_usec = 10000; // 5 ms select(0, NULL, NULL, NULL, &timeout); @@ -272,7 +343,7 @@ int main(int argc, char* argv[]) request->request = dict(); client->zstream = NULL; client->z_level = ANTD_CNONE; - + dictionary_t xheader = dict(); dput(request->request, "REQUEST_HEADER", xheader); dput(request->request, "REQUEST_DATA", dict()); @@ -305,8 +376,9 @@ int main(int argc, char* argv[]) client->sock = client_sock; time(&client->last_io); client->ssl = NULL; + // default selected protocol is http/1.1 + client->flags = CLIENT_FL_HTTP_1_1; #ifdef USE_OPENSSL - client->status = 0; if(pcnf->usessl == 1) { client->ssl = (void*)SSL_new(ctx); @@ -318,12 +390,6 @@ int main(int argc, char* argv[]) { ERROR("Cannot set ex data to ssl client:%d", client->sock); } - /*if (SSL_accept((SSL*)client->ssl) <= 0) { - LOG("EROOR accept\n"); - ERR_print_errors_fp(stderr); - antd_close(client); - continue; - }*/ } #endif conf->connection++; diff --git a/lib/dictionary.c b/lib/dictionary.c index 961ed90..3a389a6 100644 --- a/lib/dictionary.c +++ b/lib/dictionary.c @@ -23,6 +23,7 @@ THE SOFTWARE. */ #include "dictionary.h" #include "utils.h" +#include "list.h" dictionary_t dict() { @@ -66,8 +67,13 @@ chain_t __put_el_with_key(dictionary_t dic, const char* key) if(dic->map == NULL) return NULL; if ((np = dlookup(dic,key)) == NULL) { /* not found */ np = (chain_t) malloc(sizeof(*np)); - if (np == NULL || (np->key = strdup(key)) == NULL) + if (np == NULL) return NULL; + if((np->key = strdup(key)) == NULL) + { + free(np); + return NULL; + } np->value = NULL; hashval = hash(key, dic->cap); np->next = dic->map[hashval]; @@ -77,18 +83,43 @@ chain_t __put_el_with_key(dictionary_t dic, const char* key) // found return np; } -chain_t dput(dictionary_t dic,const char* key, void* value) + +static void free_ditem_value(void* value, antd_dict_item_type_t type) +{ + switch (type) + { + case ANTD_DI_HEAP: + if(value) + free(value); + break; + case ANTD_DI_LIST: + if(value) + list_free((list_t*)&value); + break; + case ANTD_DI_DIC: + if(value) + freedict(value); + default: + break; + } +} + +chain_t insert(dictionary_t dic,const char* key, void* value, antd_dict_item_type_t type) { chain_t np = __put_el_with_key(dic,key); if(np == NULL) { - if(value) free(value); + free_ditem_value(value, type); return NULL; } - if(np->value && value) free(np->value); + if(np->value && value) free_ditem_value(np->value, np->type); + np->type = type; np->value = value; return np; } + + + chain_t dremove(dictionary_t dic, const char* key) { if(dic->map == NULL) return 0; @@ -131,7 +162,7 @@ void free_association(chain_t * asoc) if(a->key) { free(a->key); - if(a->value) free(a->value); + if(a->value) free_ditem_value(a->value, a->type); } free(a); } diff --git a/lib/dictionary.h b/lib/dictionary.h index 9f65b92..5b94054 100644 --- a/lib/dictionary.h +++ b/lib/dictionary.h @@ -25,18 +25,24 @@ THE SOFTWARE. #define DICTIONARY_H #define DHASHSIZE 16 +#define dput(d,k,v) (insert(d,k,v,ANTD_DI_HEAP)) +#define dput_static(d,k,v) (insert(d,k,v,ANTD_DI_STATIC)) +#define dput_list(d,k,v) (insert(d,k,v,ANTD_DI_LIST)) +#define dput_dict(d,k,v) (insert(d,k,v,ANTD_DI_DIC)) #define for_each_assoc(assoc, dic) \ for(unsigned int i = 0; i < dic->cap; i++) \ for(assoc = dic->map[i];assoc!= NULL; assoc = assoc->next) +typedef enum{ANTD_DI_STATIC, ANTD_DI_HEAP, ANTD_DI_LIST, ANTD_DI_DIC} antd_dict_item_type_t; + /** * Dictionary for header */ typedef struct __assoc{ struct __assoc *next; char *key; + antd_dict_item_type_t type; void* value; - //char *value; } * chain_t; typedef chain_t* map_t; @@ -51,7 +57,7 @@ dictionary_t dict(); dictionary_t dict_n(unsigned int n); chain_t dlookup(dictionary_t,const char*); void* dvalue(dictionary_t, const char*); -chain_t dput(dictionary_t,const char*, void*); +chain_t insert(dictionary_t,const char*, void*, antd_dict_item_type_t type); chain_t dremove(dictionary_t, const char*); void freedict(dictionary_t); diff --git a/lib/h2.c b/lib/h2.c new file mode 100644 index 0000000..e84f22a --- /dev/null +++ b/lib/h2.c @@ -0,0 +1,101 @@ +#include "h2.h" +#include "scheduler.h" + +void* antd_h2_preface_ck(void* data) +{ + char buf[25]; + antd_request_t* rq = (antd_request_t*) data; + int count = antd_recv(rq->client,buf,24); + if(count != 24) + { + // TODO servers MUST treat an invalid connection preface as a + // connection error (Section 5.4.1) of type PROTOCOL_ERROR + ERROR("Unable to read preface for client %d: [%s]",rq->client->sock,buf); + return antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + } + buf[24] = '\0'; + if(strcmp(buf, H2_CONN_PREFACE) != 0) + { + ERROR("Connection preface is not correct for client %d: [%s]",rq->client->sock,buf); + // TODO servers MUST treat an invalid connection preface as a + // connection error (Section 5.4.1) of type PROTOCOL_ERROR + return antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + } + return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); +} + +void* antd_h2_handle(void* data) +{ + antd_request_t* rq = (antd_request_t*) data; + antd_task_t* task; + if(rq->client->flags & CLIENT_FL_READABLE) + { + task = antd_create_task(antd_h2_read,(void *)rq, NULL, rq->client->last_io); + task->priority++; + schedule_task(task); + } + if(rq->client->flags & CLIENT_FL_WRITABLE) + { + task = antd_create_task(antd_h2_write,(void *)rq, NULL, rq->client->last_io); + task->priority++; + schedule_task(task); + } + + task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + task->priority++; + return task; +} + +static int antd_h2_read_frame(antd_client_t* cl, antd_h2_frame_t* frame) +{ + uint8_t tmp; + frame->length = 0; + frame->type = 0; + frame->flags = 0; + frame->identifier= 0; + if( antd_recv(cl,& frame->length,24) != 24) return 0; + printf("length is %d\n", frame->length); + // TODO: + // Values greater than 2^14 (16,384) MUST NOT be + // sent unless the receiver has set a larger value for + // SETTINGS_MAX_FRAME_SIZE. + if( antd_recv(cl,& frame->type,8) != 8) return 0; + printf("type is %d\n", frame->type); + if( antd_recv(cl,& frame->flags,8) != 8) return 0; + if( antd_recv(cl,& tmp,1) != 1) return 0; + // identifier + if( antd_recv(cl,& frame->identifier,31) != 31) return 0; + frame->data = (uint8_t*) malloc(frame->length); + if(!frame->data) return 0; + if( antd_recv(cl,frame->data, frame->length) != frame->length) + { + free(frame->data); + return 0; + } + return 1; +} + + +void* antd_h2_read(void* data) +{ + antd_h2_frame_t frame; + antd_request_t* rq = (antd_request_t*) data; + antd_task_t* task; + if(!antd_h2_read_frame(rq->client, &frame)) + { + // TODO: frame error + printf("error reading frame\n"); + ERROR("Unable to read frame from client %d",rq->client->sock); + task = antd_create_task(NULL, (void *)rq, NULL, time(NULL)); + task->priority++; + return task; + } + // verify frame + printf("Frame type: %d\n", frame.type & 0xff); + return antd_create_task(NULL, data, NULL, time(NULL)); +} +void* antd_h2_write(void* data) +{ + printf("write task\n"); + return antd_create_task(NULL, data, NULL, time(NULL)); +} \ No newline at end of file diff --git a/lib/h2.h b/lib/h2.h new file mode 100644 index 0000000..f169897 --- /dev/null +++ b/lib/h2.h @@ -0,0 +1,57 @@ +#ifndef HTTP2_H +#define HTTP2_H +#include "handle.h" +#include "hpack.h" + +#define H2_CONN_PREFACE "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + +#define H2_FRM_DATA 0x0 +#define H2_FRM_HEADER 0x1 +#define H2_FRM_PRIORITY 0x2 +#define H2_FRM_RST_STREAM 0x3 +#define H2_FRM_SETTINGS 0x4 +#define H2_FRM_PUSH_PROMISE 0x5 +#define H2_FRM_PING 0x6 +#define H2_FRM_GOAWAY 0x7 +#define H2_FRM_WINDOW_UPDATE 0x8 +#define H2_FRM_CONTINUATION 0x9 + +/** + * Struct that holds a + * h2 connection +*/ +typedef struct { + +} antd_h2_conn_t; + +/** + * Struct that holds a + * h2 stream +*/ +typedef struct { + +} antd_h2_stream_t; + +/** + * a H2 frame +*/ +typedef struct { + // 24 bits length + unsigned int length; + // 8 bits type + uint8_t type; + // 8 bits flags + uint8_t flags; + // 31 bits identifier + unsigned int identifier; + uint8_t* data; +} antd_h2_frame_t; + +void* antd_h2_read(void* rq); +void* antd_h2_write(void* rq); + +void* antd_h2_preface_ck(void* rq); + +void* antd_h2_handle(void* rq); + +#endif \ No newline at end of file diff --git a/lib/handle.c b/lib/handle.c index 4f1dbd9..048e454 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -1,4 +1,5 @@ -#include "handle.h" +#include "handle.h" + #define HTML_TPL "%s

%s

" static const char* S_100 = "Continue"; @@ -82,6 +83,11 @@ int compressable(char* ctype) return 0; } +void schedule_task(antd_task_t* task) +{ + UNUSED(task); +} + void htdocs(antd_request_t* rq, char* dest) { dictionary_t xheader = (dictionary_t)dvalue(rq->request, "REQUEST_HEADER"); @@ -210,7 +216,7 @@ void antd_send_header(void* cl, antd_response_header_t* res) } else { - client->status = Z_NO_FLUSH; + //client->status = Z_NO_FLUSH; dput(res->header,"Content-Encoding", strdup("gzip")); } } @@ -224,7 +230,7 @@ void antd_send_header(void* cl, antd_response_header_t* res) } else { - client->status = Z_NO_FLUSH; + //client->status = Z_NO_FLUSH; dput(res->header,"Content-Encoding", strdup("deflate")); } } @@ -282,6 +288,7 @@ int antd_send(void *src, const void* data_in, int len_in) antd_client_t * source = (antd_client_t *) src; #ifdef USE_ZLIB + int status = (source->flags & CLIENT_FL_COMPRESSION_END)?Z_NO_FLUSH:Z_FINISH; if(source->zstream && source->z_level != ANTD_CNONE) { antd_compress_t current_zlevel = source->z_level; @@ -296,7 +303,7 @@ int antd_send(void *src, const void* data_in, int len_in) { zstream->avail_out = BUFFLEN; zstream->next_out = buf; - if(deflate(zstream, source->status) == Z_STREAM_ERROR) + if(deflate(zstream,status) == Z_STREAM_ERROR) { source->z_level = current_zlevel; data = NULL; @@ -643,9 +650,9 @@ int antd_close(void* src) //TODO: send finish data to the socket before quit if(source->zstream) { - if(source->status == Z_NO_FLUSH && source->z_level != ANTD_CNONE) + if(!(source->flags & CLIENT_FL_COMPRESSION_END) && source->z_level != ANTD_CNONE) { - source->status = Z_FINISH; + source->flags |= CLIENT_FL_COMPRESSION_END; antd_send(source, "", 0); } deflateEnd(source->zstream); diff --git a/lib/handle.h b/lib/handle.h index d55038a..91d1bda 100644 --- a/lib/handle.h +++ b/lib/handle.h @@ -21,6 +21,7 @@ #include "dictionary.h" #include "list.h" #include "ini.h" +#include "scheduler.h" #define SERVER_NAME "Antd" #define IS_POST(method) (strcmp(method,"POST")== 0) @@ -37,7 +38,12 @@ typedef enum {ANTD_CGZ, ANTD_CDEFL, ANTD_CNONE} antd_compress_t; -//extern config_t server_config; +// define the client flag +#define CLIENT_FL_ACCEPTED 0x01 +#define CLIENT_FL_COMPRESSION_END 0x02 +#define CLIENT_FL_HTTP_1_1 0x04 +#define CLIENT_FL_READABLE 0x08 +#define CLIENT_FL_WRITABLE 0x10 typedef struct { unsigned int port; @@ -50,9 +56,9 @@ typedef struct { typedef struct{ int sock; void* ssl; - int status; + uint8_t flags; time_t last_io; - // compress + // compress option antd_compress_t z_level; void* zstream; } antd_client_t; @@ -72,7 +78,7 @@ typedef struct -typedef struct { +typedef struct { //int port; char *plugins_dir; char *plugins_ext; @@ -105,14 +111,15 @@ typedef struct { int raw_body; } plugin_header_t; - -int __attribute__((weak)) require_plugin(const char*); +// some functions that allows access to server +// private data +int __attribute__((weak)) require_plugin(const char*); void __attribute__((weak)) htdocs(antd_request_t* rq, char* dest); void __attribute__((weak)) dbdir(char* dest); void __attribute__((weak)) tmpdir(char* dest); void __attribute__((weak)) plugindir(char* dest); - -int __attribute__((weak)) compressable(char* ctype); +int __attribute__((weak)) compressable(char* ctype); +void __attribute__((weak)) schedule_task(antd_task_t* task); void set_nonblock(int socket); //void set_block(int socket); diff --git a/lib/hpack.c b/lib/hpack.c new file mode 100644 index 0000000..bfd44ee --- /dev/null +++ b/lib/hpack.c @@ -0,0 +1 @@ +#include "hpack.h" \ No newline at end of file diff --git a/lib/hpack.h b/lib/hpack.h new file mode 100644 index 0000000..6ed8fc3 --- /dev/null +++ b/lib/hpack.h @@ -0,0 +1,6 @@ +#ifndef HPACK_H +#define HPACK_H + +// HPACK header compression implementation + +#endif \ No newline at end of file diff --git a/lib/scheduler.c b/lib/scheduler.c index 27eff45..e685a59 100644 --- a/lib/scheduler.c +++ b/lib/scheduler.c @@ -155,8 +155,9 @@ void antd_scheduler_init(antd_scheduler_t* scheduler, int n) scheduler->status = 1; scheduler->workers_queue = NULL; scheduler->pending_task = 0 ; - scheduler->validate_data = 0; + scheduler->validate_data = NULL; scheduler->destroy_data = NULL; + scheduler->task_ready = NULL; // init semaphore scheduler->scheduler_sem = sem_open("scheduler", O_CREAT, 0600, 0); if (scheduler->scheduler_sem == SEM_FAILED) @@ -323,7 +324,7 @@ int antd_task_schedule(antd_scheduler_t* scheduler) } // has the task now // validate the task - if(scheduler->validate_data && difftime( time(NULL), it->task->access_time) > MAX_VALIDITY_INTERVAL) + if(scheduler->validate_data && ! scheduler->validate_data(it->task)) { // data task is not valid LOG("Task data is not valid, task will be killed"); @@ -336,6 +337,16 @@ int antd_task_schedule(antd_scheduler_t* scheduler) return 0; } + // check if the task is ready + if(scheduler->task_ready && !scheduler->task_ready(it->task)) + { + // task is not ready, put it back to the queue + antd_add_task(scheduler, it->task); + free(it); + return 0; + } + + // task is ready for execute, now figure out how it will be executed // check the type of task if(it->task->type == LIGHT || scheduler->n_workers <= 0) { @@ -355,7 +366,7 @@ int antd_task_schedule(antd_scheduler_t* scheduler) } return 1; } -void antd_wait(antd_scheduler_t* scheduler) +void antd_scheduler_wait(antd_scheduler_t* scheduler) { int stat; while(scheduler->status) diff --git a/lib/scheduler.h b/lib/scheduler.h index df2612d..baa4ea0 100644 --- a/lib/scheduler.h +++ b/lib/scheduler.h @@ -9,7 +9,7 @@ #define NORMAL_PRIORITY ((int)((N_PRIORITY - 1) / 2)) #define LOW_PRIORITY (N_PRIORITY - 1) #define HIGH_PRIORITY 0 -#define MAX_VALIDITY_INTERVAL 20 // 10 s for task validity + typedef enum { LIGHT, @@ -92,7 +92,8 @@ typedef struct default to NULL */ void* (*destroy_data)(void*); - int validate_data; + int (*validate_data)(antd_task_t*); + int (*task_ready)(antd_task_t*); } antd_scheduler_t; /* @@ -133,7 +134,7 @@ int antd_task_schedule(antd_scheduler_t *); /* wait for event */ -void antd_wait(antd_scheduler_t *); +void antd_scheduler_wait(antd_scheduler_t *); antd_callback_t* callback_of( void* (*callback)(void*) ); #endif \ No newline at end of file diff --git a/lib/ws.c b/lib/ws.c index 9198686..3b2ba0f 100644 --- a/lib/ws.c +++ b/lib/ws.c @@ -403,7 +403,7 @@ int ws_client_connect(ws_client_t* wsclient, port_config_t pcnf) } // will be free wsclient->antdsock->sock = sock; - wsclient->antdsock->status = 0; + wsclient->antdsock->flags = 0; wsclient->antdsock->last_io = time(NULL); wsclient->antdsock->zstream = NULL; #ifdef USE_OPENSSL