From 353323f93afc879bc3192e2938fd670454d9b16c Mon Sep 17 00:00:00 2001 From: lxsang Date: Wed, 15 Jan 2020 18:27:28 +0100 Subject: [PATCH 1/5] add h2 implement --- Makefile.am | 6 ++- http_server.c | 128 ++++++++++++++++++++--------------------------- http_server.h | 5 +- httpd.c | 96 +++++++++++++++++++++++++++++------ lib/dictionary.c | 41 +++++++++++++-- lib/dictionary.h | 10 +++- lib/h2.c | 101 +++++++++++++++++++++++++++++++++++++ lib/h2.h | 57 +++++++++++++++++++++ lib/handle.c | 19 ++++--- lib/handle.h | 23 ++++++--- lib/hpack.c | 1 + lib/hpack.h | 6 +++ lib/scheduler.c | 17 +++++-- lib/scheduler.h | 7 +-- lib/ws.c | 2 +- 15 files changed, 397 insertions(+), 122 deletions(-) create mode 100644 lib/h2.c create mode 100644 lib/h2.h create mode 100644 lib/hpack.c create mode 100644 lib/hpack.h diff --git a/Makefile.am b/Makefile.am index ad67d06..2dbe5e5 100644 --- a/Makefile.am +++ b/Makefile.am @@ -21,7 +21,8 @@ libantd_la_SOURCES = lib/ini.c \ lib/ws.c \ lib/sha1.c \ lib/list.c \ - lib/scheduler.c + lib/scheduler.c \ + lib/h2.c pkginclude_HEADERS = lib/ini.h \ lib/handle.h \ @@ -32,7 +33,8 @@ pkginclude_HEADERS = lib/ini.h \ lib/sha1.h \ lib/list.h \ lib/scheduler.h \ - lib/plugin.h + lib/plugin.h \ + lib/h2.h EXTRA_DIST = plugin_manager.h http_server.h README.md LICENSE antd-config.ini diff --git a/http_server.c b/http_server.c index 8a1072e..1df5013 100644 --- a/http_server.c +++ b/http_server.c @@ -1,5 +1,5 @@ #include "http_server.h" - +#include "lib/h2.h" //define all basic mime here static mime_t _mimes[] = { {"image/bmp","bmp"}, @@ -302,7 +302,7 @@ void load_config(const char *file) // put it default mimes for(int i = 0; _mimes[i].type != NULL; i++) { - dput(server_config.mimes,_mimes[i].type, strdup(_mimes[i].ext)); + dput_static(server_config.mimes,_mimes[i].type, (void*)_mimes[i].ext); } if (ini_parse(file, config_handler, &server_config) < 0) { @@ -327,9 +327,6 @@ void load_config(const char *file) void *accept_request(void *data) { - char buf[BUFFLEN]; - char *token = NULL; - char *line = NULL; antd_task_t *task; antd_request_t *rq = (antd_request_t *)data; @@ -354,22 +351,13 @@ void *accept_request(void *data) } if (sel == 0 || (!FD_ISSET(client->sock, &read_flags) && !FD_ISSET(client->sock, &write_flags))) { - /*if(client->last_wait == 0) client->last_wait = time(NULL); - // retry it later - if(time(NULL) - client->last_wait > MAX_WAIT_S) - { - LOG("Read and write timeout, give up on %d\n", client->sock); - server_config.connection++; - unknow(rq->client); - return task; - }*/ task->handle = accept_request; return task; } // perform the ssl handshake if enabled #ifdef USE_OPENSSL int ret = -1, stat; - if (client->ssl && client->status == 0) + if (client->ssl && !(client->flags & CLIENT_FL_ACCEPTED) ) { //LOG("Atttempt %d\n", client->attempt); if (SSL_accept((SSL *)client->ssl) == -1) @@ -380,27 +368,17 @@ void *accept_request(void *data) case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: case SSL_ERROR_NONE: - //LOG("RETRY SSL %d\n", client->sock); - /*if(client->last_wait == 0) client->last_wait = time(NULL); - if(time(NULL) - client->last_wait > MAX_WAIT_S) - { - server_config.connection++; - unknow(rq->client); - LOG("SSL timeout, give up on %d\n", client->sock); - return task; - } - task->status = TASK_ACCEPT_SSL_CONT;*/ task->handle = accept_request; return task; default: ERROR("Error performing SSL handshake %d %d %s", stat, ret, ERR_error_string(ERR_get_error(), NULL)); antd_error(rq->client, 400, "Invalid SSL request"); - //server_config.connection++; + ERR_print_errors_fp(stderr); return task; } } - client->status = 1; + client->flags |= CLIENT_FL_ACCEPTED; task->handle = accept_request; //LOG("Handshake finish for %d\n", client->sock); return task; @@ -409,54 +387,22 @@ void *accept_request(void *data) { if (!FD_ISSET(client->sock, &read_flags)) { - /*if(client->last_wait == 0) client->last_wait = time(NULL); - if(time(NULL) - client->last_wait > MAX_WAIT_S) - { - server_config.connection++; - unknow(rq->client); - LOG("Read timeout, give up on %d\n", client->sock); - return task; - }*/ + client->flags |= CLIENT_FL_ACCEPTED; task->handle = accept_request; return task; } } #endif - //LOG("Ready for reading %d\n", client->sock); - //server_config.connection++; - read_buf(rq->client, buf, sizeof(buf)); - line = buf; - // get the method string - token = strsep(&line, " "); - if (!line) + //printf("Flag: %d\n", client->flags); + // now return the task base on the http version + if(client->flags & CLIENT_FL_HTTP_1_1) { - //LOG("No method found"); - antd_error(rq->client, 405, "No method found"); - return task; + task->handle = decode_request_header; } - trim(token, ' '); - trim(line, ' '); - dput(rq->request, "METHOD", strdup(token)); - // get the request - token = strsep(&line, " "); - if (!line) + else { - //LOG("No request found"); - antd_error(rq->client, 400, "Bad request"); - return task; + task->handle = antd_h2_preface_ck; } - trim(token, ' '); - trim(line, ' '); - trim(line, '\n'); - trim(line, '\r'); - dput(rq->request, "PROTOCOL", strdup(line)); - dput(rq->request, "REQUEST_QUERY", strdup(token)); - line = token; - token = strsep(&line, "?"); - dput(rq->request, "REQUEST_PATH", url_decode(token)); - // decode request - // now return the task - task->handle = decode_request_header; return task; } @@ -533,7 +479,7 @@ void *resolve_request(void *data) // find an handler plugin to process it // if the plugin is not found, forbidden access to the file should be sent char *mime_type = mime(path); - dput(rq->request, "RESOURCE_MIME", strdup(mime_type)); + dput_static(rq->request, "RESOURCE_MIME", mime_type); if (strcmp(mime_type, "application/octet-stream") == 0) { char *ex = ext(path); @@ -712,7 +658,7 @@ void *serve_file(void *data) rhd.cookie = NULL; rhd.status = 200; rhd.header = dict(); - dput(rhd.header, "Content-Type", strdup(mime_type)); + dput_static(rhd.header, "Content-Type", mime_type); #ifdef USE_ZLIB if(!compressable(mime_type) || rq->client->z_level == ANTD_CNONE) #endif @@ -720,7 +666,7 @@ void *serve_file(void *data) gmtime_r(&st.st_ctime, &tm); strftime(ibuf, 255, "%a, %d %b %Y %H:%M:%S GMT", &tm); dput(rhd.header, "Last-Modified", strdup(ibuf)); - dput(rhd.header, "Cache-Control", strdup("no-cache")); + dput_static(rhd.header, "Cache-Control", "no-cache"); antd_send_header(rq->client, &rhd); __f(rq->client, path); @@ -818,12 +764,48 @@ void *decode_request_header(void *data) char *query = NULL; char *host = NULL; char buf[2 * BUFFLEN]; + // read the first line + + //server_config.connection++; + read_buf(rq->client, buf, sizeof(buf)); + line = buf; + trim(line, '\n'); + trim(line, '\r'); + // get the method string + token = strsep(&line, " "); + if (!line) + { + //LOG("No method found"); + antd_error(rq->client, 405, "No method found"); + return antd_create_task(NULL, (void *)rq, NULL,rq->client->last_io); + } + trim(token, ' '); + trim(line, ' '); + dput(rq->request, "METHOD", strdup(token)); + // get the request + token = strsep(&line, " "); + if (!line) + { + //LOG("No request found"); + antd_error(rq->client, 400, "Bad request"); + return antd_create_task(NULL, (void *)rq, NULL,rq->client->last_io); + } + trim(token, ' '); + trim(line, ' '); + trim(line, '\n'); + trim(line, '\r'); + dput(rq->request, "PROTOCOL", strdup(line)); + dput(rq->request, "REQUEST_QUERY", strdup(token)); + line = token; + token = strsep(&line, "?"); + dput(rq->request, "REQUEST_PATH", url_decode(token)); + char *url = (char *)dvalue(rq->request, "REQUEST_QUERY"); dictionary_t xheader = dvalue(rq->request, "REQUEST_HEADER"); dictionary_t request = dvalue(rq->request, "REQUEST_DATA"); char* port_s = (char*) dvalue(xheader, "SERVER_PORT"); port_config_t* pcnf = (port_config_t*)dvalue(server_config.ports, port_s); - // first real all header + // this for check if web socket is enabled while ((read_buf(rq->client, buf, sizeof(buf))) && strcmp("\r\n", buf)) { @@ -937,7 +919,7 @@ void *decode_request(void *data) // insert wsocket flag to request // plugin should handle this ugraded connection // not the server - dput(rq->request, "__web_socket__", strdup("1")); + dput_static(rq->request, "__web_socket__", "1"); } // resolve task task->handle = resolve_request; @@ -1002,8 +984,8 @@ void *decode_post_request(void *data) key = ctype; if(pquery) { - dput(request, key, strdup(pquery)); - free(pquery); + dput(request, key, pquery); + //free(pquery); } } return task; diff --git a/http_server.h b/http_server.h index c5c97a4..183dfa9 100644 --- a/http_server.h +++ b/http_server.h @@ -24,9 +24,7 @@ void destroy_config(); void load_config(const char* file); void* accept_request(void*); void* finish_request(void*); -void cat(void*, FILE *); -void cannot_execute(void*); -//int get_line(int, char *, int); + void* serve_file(void*); int startup(unsigned *); int rule_check(const char*, const char*, const char* , const char* , const char* , char*); @@ -41,7 +39,6 @@ void* decode_multi_part_request(void*,const char*); void* decode_multi_part_request_data(void* data); void decode_cookie(const char*, dictionary_t d); char* post_data_decode(void*,int); -void set_nonblock(int); void* execute_plugin(void* data, const char *path); #endif \ No newline at end of file diff --git a/httpd.c b/httpd.c index ccc21c6..0deb04c 100644 --- a/httpd.c +++ b/httpd.c @@ -2,6 +2,7 @@ #include #include "http_server.h" #include "lib/ini.h" +#define MAX_VALIDITY_INTERVAL 20 static antd_scheduler_t scheduler; @@ -42,7 +43,8 @@ SSL_CTX *create_context() } #if OPENSSL_VERSION_NUMBER >= 0x10002000L static unsigned char antd_protocols[] = { - //TODO: add support to HTTP/2 protocol: 2,'h', '2', + //TODO: add support to HTTP/2 protocol: + 2,'h', '2', 8, 'h', 't', 't', 'p', '/', '1', '.', '1' }; static int alpn_advertise_protos_cb(SSL *ssl, const unsigned char **out, unsigned int *outlen,void *arg) @@ -55,10 +57,28 @@ static int alpn_advertise_protos_cb(SSL *ssl, const unsigned char **out, unsigne } static int alpn_select_cb(SSL *ssl, const unsigned char **out, unsigned char *outlen, const unsigned char *in, unsigned int inlen, void *arg) { - UNUSED(ssl); UNUSED(arg); + char buf[64]; if(SSL_select_next_proto((unsigned char **)out, outlen,antd_protocols,sizeof(antd_protocols),in, inlen) == OPENSSL_NPN_NEGOTIATED) { + // set client flag to indicate protocol + int sock = SSL_get_fd(ssl); + if(sock <= 0) + { + return SSL_TLSEXT_ERR_ALERT_FATAL; + } + + antd_client_t* client = SSL_get_ex_data(ssl, sock); + if(!client) + { + return SSL_TLSEXT_ERR_ALERT_FATAL; + } + memcpy(buf,*out,*outlen); + buf[*outlen] = '\0'; + if(strcmp(buf,"http/1.1") !=0 ) + { + client->flags &= ~CLIENT_FL_HTTP_1_1; + } return SSL_TLSEXT_ERR_OK; } else @@ -120,6 +140,11 @@ void configure_context(SSL_CTX *ctx) #endif +void schedule_task(antd_task_t* task) +{ + antd_add_task(&scheduler, task); +} + void stop_serve(int dummy) { UNUSED(dummy); @@ -148,6 +173,52 @@ void stop_serve(int dummy) { sigprocmask(SIG_UNBLOCK, &mask, NULL); } + +static int validate_data(antd_task_t* task) +{ + if(difftime( time(NULL), task->access_time) > MAX_VALIDITY_INTERVAL) + return 0; + return 1; +} + +static int is_task_ready(antd_task_t* task) +{ + antd_request_t* rq = (antd_request_t*)task->data; + if(!rq) return 0; + // check if data is ready for read/write + fd_set read_flags, write_flags; + struct timeval timeout; + FD_ZERO(&read_flags); + FD_SET(rq->client->sock, &read_flags); + FD_ZERO(&write_flags); + FD_SET(rq->client->sock, &write_flags); + timeout.tv_sec = 0; + timeout.tv_usec = 0; + int sel = select(rq->client->sock + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); + if(sel > 0 && (FD_ISSET(rq->client->sock, &read_flags)|| FD_ISSET(rq->client->sock, &write_flags))) + { + if(FD_ISSET(rq->client->sock, &read_flags)) + { + rq->client->flags |= CLIENT_FL_READABLE; + } + else + { + rq->client->flags &= ~CLIENT_FL_READABLE; + } + + if(FD_ISSET(rq->client->sock, &write_flags)) + { + rq->client->flags |= CLIENT_FL_WRITABLE; + } + else + { + rq->client->flags &= ~CLIENT_FL_WRITABLE; + } + return 1; + } + return 0; +} + int main(int argc, char* argv[]) { // load the config first @@ -207,15 +278,16 @@ int main(int argc, char* argv[]) } // default to 4 workers antd_scheduler_init(&scheduler, conf->n_workers); - scheduler.validate_data = 1; + scheduler.validate_data = validate_data; scheduler.destroy_data = finish_request; - // use blocking server_sock + scheduler.task_ready = is_task_ready; + + // make the scheduler wait for event on another thread // this allow to ged rid of high cpu usage on // endless loop without doing anything - // set_nonblock(server_sock); pthread_t scheduler_th; - if (pthread_create(&scheduler_th, NULL,(void *(*)(void *))antd_wait, (void*)&scheduler) != 0) + if (pthread_create(&scheduler_th, NULL,(void *(*)(void *))antd_scheduler_wait, (void*)&scheduler) != 0) { ERROR("pthread_create: cannot create worker"); stop_serve(0); @@ -237,7 +309,6 @@ int main(int argc, char* argv[]) { if(conf->connection > conf->maxcon) { - //ERROR("Reach max connection %d", conf->connection); timeout.tv_sec = 0; timeout.tv_usec = 10000; // 5 ms select(0, NULL, NULL, NULL, &timeout); @@ -267,7 +338,7 @@ int main(int argc, char* argv[]) request->request = dict(); client->zstream = NULL; client->z_level = ANTD_CNONE; - + dictionary_t xheader = dict(); dput(request->request, "REQUEST_HEADER", xheader); dput(request->request, "REQUEST_DATA", dict()); @@ -300,8 +371,9 @@ int main(int argc, char* argv[]) client->sock = client_sock; time(&client->last_io); client->ssl = NULL; + // default selected protocol is http/1.1 + client->flags = CLIENT_FL_HTTP_1_1; #ifdef USE_OPENSSL - client->status = 0; if(pcnf->usessl == 1) { client->ssl = (void*)SSL_new(ctx); @@ -313,12 +385,6 @@ int main(int argc, char* argv[]) { ERROR("Cannot set ex data to ssl client:%d", client->sock); } - /*if (SSL_accept((SSL*)client->ssl) <= 0) { - LOG("EROOR accept\n"); - ERR_print_errors_fp(stderr); - antd_close(client); - continue; - }*/ } #endif conf->connection++; diff --git a/lib/dictionary.c b/lib/dictionary.c index 961ed90..3a389a6 100644 --- a/lib/dictionary.c +++ b/lib/dictionary.c @@ -23,6 +23,7 @@ THE SOFTWARE. */ #include "dictionary.h" #include "utils.h" +#include "list.h" dictionary_t dict() { @@ -66,8 +67,13 @@ chain_t __put_el_with_key(dictionary_t dic, const char* key) if(dic->map == NULL) return NULL; if ((np = dlookup(dic,key)) == NULL) { /* not found */ np = (chain_t) malloc(sizeof(*np)); - if (np == NULL || (np->key = strdup(key)) == NULL) + if (np == NULL) return NULL; + if((np->key = strdup(key)) == NULL) + { + free(np); + return NULL; + } np->value = NULL; hashval = hash(key, dic->cap); np->next = dic->map[hashval]; @@ -77,18 +83,43 @@ chain_t __put_el_with_key(dictionary_t dic, const char* key) // found return np; } -chain_t dput(dictionary_t dic,const char* key, void* value) + +static void free_ditem_value(void* value, antd_dict_item_type_t type) +{ + switch (type) + { + case ANTD_DI_HEAP: + if(value) + free(value); + break; + case ANTD_DI_LIST: + if(value) + list_free((list_t*)&value); + break; + case ANTD_DI_DIC: + if(value) + freedict(value); + default: + break; + } +} + +chain_t insert(dictionary_t dic,const char* key, void* value, antd_dict_item_type_t type) { chain_t np = __put_el_with_key(dic,key); if(np == NULL) { - if(value) free(value); + free_ditem_value(value, type); return NULL; } - if(np->value && value) free(np->value); + if(np->value && value) free_ditem_value(np->value, np->type); + np->type = type; np->value = value; return np; } + + + chain_t dremove(dictionary_t dic, const char* key) { if(dic->map == NULL) return 0; @@ -131,7 +162,7 @@ void free_association(chain_t * asoc) if(a->key) { free(a->key); - if(a->value) free(a->value); + if(a->value) free_ditem_value(a->value, a->type); } free(a); } diff --git a/lib/dictionary.h b/lib/dictionary.h index 9f65b92..5b94054 100644 --- a/lib/dictionary.h +++ b/lib/dictionary.h @@ -25,18 +25,24 @@ THE SOFTWARE. #define DICTIONARY_H #define DHASHSIZE 16 +#define dput(d,k,v) (insert(d,k,v,ANTD_DI_HEAP)) +#define dput_static(d,k,v) (insert(d,k,v,ANTD_DI_STATIC)) +#define dput_list(d,k,v) (insert(d,k,v,ANTD_DI_LIST)) +#define dput_dict(d,k,v) (insert(d,k,v,ANTD_DI_DIC)) #define for_each_assoc(assoc, dic) \ for(unsigned int i = 0; i < dic->cap; i++) \ for(assoc = dic->map[i];assoc!= NULL; assoc = assoc->next) +typedef enum{ANTD_DI_STATIC, ANTD_DI_HEAP, ANTD_DI_LIST, ANTD_DI_DIC} antd_dict_item_type_t; + /** * Dictionary for header */ typedef struct __assoc{ struct __assoc *next; char *key; + antd_dict_item_type_t type; void* value; - //char *value; } * chain_t; typedef chain_t* map_t; @@ -51,7 +57,7 @@ dictionary_t dict(); dictionary_t dict_n(unsigned int n); chain_t dlookup(dictionary_t,const char*); void* dvalue(dictionary_t, const char*); -chain_t dput(dictionary_t,const char*, void*); +chain_t insert(dictionary_t,const char*, void*, antd_dict_item_type_t type); chain_t dremove(dictionary_t, const char*); void freedict(dictionary_t); diff --git a/lib/h2.c b/lib/h2.c new file mode 100644 index 0000000..e84f22a --- /dev/null +++ b/lib/h2.c @@ -0,0 +1,101 @@ +#include "h2.h" +#include "scheduler.h" + +void* antd_h2_preface_ck(void* data) +{ + char buf[25]; + antd_request_t* rq = (antd_request_t*) data; + int count = antd_recv(rq->client,buf,24); + if(count != 24) + { + // TODO servers MUST treat an invalid connection preface as a + // connection error (Section 5.4.1) of type PROTOCOL_ERROR + ERROR("Unable to read preface for client %d: [%s]",rq->client->sock,buf); + return antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + } + buf[24] = '\0'; + if(strcmp(buf, H2_CONN_PREFACE) != 0) + { + ERROR("Connection preface is not correct for client %d: [%s]",rq->client->sock,buf); + // TODO servers MUST treat an invalid connection preface as a + // connection error (Section 5.4.1) of type PROTOCOL_ERROR + return antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + } + return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); +} + +void* antd_h2_handle(void* data) +{ + antd_request_t* rq = (antd_request_t*) data; + antd_task_t* task; + if(rq->client->flags & CLIENT_FL_READABLE) + { + task = antd_create_task(antd_h2_read,(void *)rq, NULL, rq->client->last_io); + task->priority++; + schedule_task(task); + } + if(rq->client->flags & CLIENT_FL_WRITABLE) + { + task = antd_create_task(antd_h2_write,(void *)rq, NULL, rq->client->last_io); + task->priority++; + schedule_task(task); + } + + task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + task->priority++; + return task; +} + +static int antd_h2_read_frame(antd_client_t* cl, antd_h2_frame_t* frame) +{ + uint8_t tmp; + frame->length = 0; + frame->type = 0; + frame->flags = 0; + frame->identifier= 0; + if( antd_recv(cl,& frame->length,24) != 24) return 0; + printf("length is %d\n", frame->length); + // TODO: + // Values greater than 2^14 (16,384) MUST NOT be + // sent unless the receiver has set a larger value for + // SETTINGS_MAX_FRAME_SIZE. + if( antd_recv(cl,& frame->type,8) != 8) return 0; + printf("type is %d\n", frame->type); + if( antd_recv(cl,& frame->flags,8) != 8) return 0; + if( antd_recv(cl,& tmp,1) != 1) return 0; + // identifier + if( antd_recv(cl,& frame->identifier,31) != 31) return 0; + frame->data = (uint8_t*) malloc(frame->length); + if(!frame->data) return 0; + if( antd_recv(cl,frame->data, frame->length) != frame->length) + { + free(frame->data); + return 0; + } + return 1; +} + + +void* antd_h2_read(void* data) +{ + antd_h2_frame_t frame; + antd_request_t* rq = (antd_request_t*) data; + antd_task_t* task; + if(!antd_h2_read_frame(rq->client, &frame)) + { + // TODO: frame error + printf("error reading frame\n"); + ERROR("Unable to read frame from client %d",rq->client->sock); + task = antd_create_task(NULL, (void *)rq, NULL, time(NULL)); + task->priority++; + return task; + } + // verify frame + printf("Frame type: %d\n", frame.type & 0xff); + return antd_create_task(NULL, data, NULL, time(NULL)); +} +void* antd_h2_write(void* data) +{ + printf("write task\n"); + return antd_create_task(NULL, data, NULL, time(NULL)); +} \ No newline at end of file diff --git a/lib/h2.h b/lib/h2.h new file mode 100644 index 0000000..f169897 --- /dev/null +++ b/lib/h2.h @@ -0,0 +1,57 @@ +#ifndef HTTP2_H +#define HTTP2_H +#include "handle.h" +#include "hpack.h" + +#define H2_CONN_PREFACE "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + +#define H2_FRM_DATA 0x0 +#define H2_FRM_HEADER 0x1 +#define H2_FRM_PRIORITY 0x2 +#define H2_FRM_RST_STREAM 0x3 +#define H2_FRM_SETTINGS 0x4 +#define H2_FRM_PUSH_PROMISE 0x5 +#define H2_FRM_PING 0x6 +#define H2_FRM_GOAWAY 0x7 +#define H2_FRM_WINDOW_UPDATE 0x8 +#define H2_FRM_CONTINUATION 0x9 + +/** + * Struct that holds a + * h2 connection +*/ +typedef struct { + +} antd_h2_conn_t; + +/** + * Struct that holds a + * h2 stream +*/ +typedef struct { + +} antd_h2_stream_t; + +/** + * a H2 frame +*/ +typedef struct { + // 24 bits length + unsigned int length; + // 8 bits type + uint8_t type; + // 8 bits flags + uint8_t flags; + // 31 bits identifier + unsigned int identifier; + uint8_t* data; +} antd_h2_frame_t; + +void* antd_h2_read(void* rq); +void* antd_h2_write(void* rq); + +void* antd_h2_preface_ck(void* rq); + +void* antd_h2_handle(void* rq); + +#endif \ No newline at end of file diff --git a/lib/handle.c b/lib/handle.c index 4f1dbd9..048e454 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -1,4 +1,5 @@ -#include "handle.h" +#include "handle.h" + #define HTML_TPL "%s

%s

" static const char* S_100 = "Continue"; @@ -82,6 +83,11 @@ int compressable(char* ctype) return 0; } +void schedule_task(antd_task_t* task) +{ + UNUSED(task); +} + void htdocs(antd_request_t* rq, char* dest) { dictionary_t xheader = (dictionary_t)dvalue(rq->request, "REQUEST_HEADER"); @@ -210,7 +216,7 @@ void antd_send_header(void* cl, antd_response_header_t* res) } else { - client->status = Z_NO_FLUSH; + //client->status = Z_NO_FLUSH; dput(res->header,"Content-Encoding", strdup("gzip")); } } @@ -224,7 +230,7 @@ void antd_send_header(void* cl, antd_response_header_t* res) } else { - client->status = Z_NO_FLUSH; + //client->status = Z_NO_FLUSH; dput(res->header,"Content-Encoding", strdup("deflate")); } } @@ -282,6 +288,7 @@ int antd_send(void *src, const void* data_in, int len_in) antd_client_t * source = (antd_client_t *) src; #ifdef USE_ZLIB + int status = (source->flags & CLIENT_FL_COMPRESSION_END)?Z_NO_FLUSH:Z_FINISH; if(source->zstream && source->z_level != ANTD_CNONE) { antd_compress_t current_zlevel = source->z_level; @@ -296,7 +303,7 @@ int antd_send(void *src, const void* data_in, int len_in) { zstream->avail_out = BUFFLEN; zstream->next_out = buf; - if(deflate(zstream, source->status) == Z_STREAM_ERROR) + if(deflate(zstream,status) == Z_STREAM_ERROR) { source->z_level = current_zlevel; data = NULL; @@ -643,9 +650,9 @@ int antd_close(void* src) //TODO: send finish data to the socket before quit if(source->zstream) { - if(source->status == Z_NO_FLUSH && source->z_level != ANTD_CNONE) + if(!(source->flags & CLIENT_FL_COMPRESSION_END) && source->z_level != ANTD_CNONE) { - source->status = Z_FINISH; + source->flags |= CLIENT_FL_COMPRESSION_END; antd_send(source, "", 0); } deflateEnd(source->zstream); diff --git a/lib/handle.h b/lib/handle.h index 98e0dec..75a9f79 100644 --- a/lib/handle.h +++ b/lib/handle.h @@ -21,6 +21,7 @@ #include "dictionary.h" #include "list.h" #include "ini.h" +#include "scheduler.h" #define SERVER_NAME "Antd" #define IS_POST(method) (strcmp(method,"POST")== 0) @@ -37,7 +38,12 @@ typedef enum {ANTD_CGZ, ANTD_CDEFL, ANTD_CNONE} antd_compress_t; -//extern config_t server_config; +// define the client flag +#define CLIENT_FL_ACCEPTED 0x01 +#define CLIENT_FL_COMPRESSION_END 0x02 +#define CLIENT_FL_HTTP_1_1 0x04 +#define CLIENT_FL_READABLE 0x08 +#define CLIENT_FL_WRITABLE 0x10 typedef struct { unsigned int port; @@ -50,9 +56,9 @@ typedef struct { typedef struct{ int sock; void* ssl; - int status; + uint8_t flags; time_t last_io; - // compress + // compress option antd_compress_t z_level; void* zstream; } antd_client_t; @@ -72,7 +78,7 @@ typedef struct -typedef struct { +typedef struct { //int port; char *plugins_dir; char *plugins_ext; @@ -108,14 +114,15 @@ typedef struct { int raw_body; } plugin_header_t; - -int __attribute__((weak)) require_plugin(const char*); +// some functions that allows access to server +// private data +int __attribute__((weak)) require_plugin(const char*); void __attribute__((weak)) htdocs(antd_request_t* rq, char* dest); void __attribute__((weak)) dbdir(char* dest); void __attribute__((weak)) tmpdir(char* dest); void __attribute__((weak)) plugindir(char* dest); - -int __attribute__((weak)) compressable(char* ctype); +int __attribute__((weak)) compressable(char* ctype); +void __attribute__((weak)) schedule_task(antd_task_t* task); void set_nonblock(int socket); //void set_block(int socket); diff --git a/lib/hpack.c b/lib/hpack.c new file mode 100644 index 0000000..bfd44ee --- /dev/null +++ b/lib/hpack.c @@ -0,0 +1 @@ +#include "hpack.h" \ No newline at end of file diff --git a/lib/hpack.h b/lib/hpack.h new file mode 100644 index 0000000..6ed8fc3 --- /dev/null +++ b/lib/hpack.h @@ -0,0 +1,6 @@ +#ifndef HPACK_H +#define HPACK_H + +// HPACK header compression implementation + +#endif \ No newline at end of file diff --git a/lib/scheduler.c b/lib/scheduler.c index 27eff45..e685a59 100644 --- a/lib/scheduler.c +++ b/lib/scheduler.c @@ -155,8 +155,9 @@ void antd_scheduler_init(antd_scheduler_t* scheduler, int n) scheduler->status = 1; scheduler->workers_queue = NULL; scheduler->pending_task = 0 ; - scheduler->validate_data = 0; + scheduler->validate_data = NULL; scheduler->destroy_data = NULL; + scheduler->task_ready = NULL; // init semaphore scheduler->scheduler_sem = sem_open("scheduler", O_CREAT, 0600, 0); if (scheduler->scheduler_sem == SEM_FAILED) @@ -323,7 +324,7 @@ int antd_task_schedule(antd_scheduler_t* scheduler) } // has the task now // validate the task - if(scheduler->validate_data && difftime( time(NULL), it->task->access_time) > MAX_VALIDITY_INTERVAL) + if(scheduler->validate_data && ! scheduler->validate_data(it->task)) { // data task is not valid LOG("Task data is not valid, task will be killed"); @@ -336,6 +337,16 @@ int antd_task_schedule(antd_scheduler_t* scheduler) return 0; } + // check if the task is ready + if(scheduler->task_ready && !scheduler->task_ready(it->task)) + { + // task is not ready, put it back to the queue + antd_add_task(scheduler, it->task); + free(it); + return 0; + } + + // task is ready for execute, now figure out how it will be executed // check the type of task if(it->task->type == LIGHT || scheduler->n_workers <= 0) { @@ -355,7 +366,7 @@ int antd_task_schedule(antd_scheduler_t* scheduler) } return 1; } -void antd_wait(antd_scheduler_t* scheduler) +void antd_scheduler_wait(antd_scheduler_t* scheduler) { int stat; while(scheduler->status) diff --git a/lib/scheduler.h b/lib/scheduler.h index df2612d..baa4ea0 100644 --- a/lib/scheduler.h +++ b/lib/scheduler.h @@ -9,7 +9,7 @@ #define NORMAL_PRIORITY ((int)((N_PRIORITY - 1) / 2)) #define LOW_PRIORITY (N_PRIORITY - 1) #define HIGH_PRIORITY 0 -#define MAX_VALIDITY_INTERVAL 20 // 10 s for task validity + typedef enum { LIGHT, @@ -92,7 +92,8 @@ typedef struct default to NULL */ void* (*destroy_data)(void*); - int validate_data; + int (*validate_data)(antd_task_t*); + int (*task_ready)(antd_task_t*); } antd_scheduler_t; /* @@ -133,7 +134,7 @@ int antd_task_schedule(antd_scheduler_t *); /* wait for event */ -void antd_wait(antd_scheduler_t *); +void antd_scheduler_wait(antd_scheduler_t *); antd_callback_t* callback_of( void* (*callback)(void*) ); #endif \ No newline at end of file diff --git a/lib/ws.c b/lib/ws.c index 9198686..3b2ba0f 100644 --- a/lib/ws.c +++ b/lib/ws.c @@ -403,7 +403,7 @@ int ws_client_connect(ws_client_t* wsclient, port_config_t pcnf) } // will be free wsclient->antdsock->sock = sock; - wsclient->antdsock->status = 0; + wsclient->antdsock->flags = 0; wsclient->antdsock->last_io = time(NULL); wsclient->antdsock->zstream = NULL; #ifdef USE_OPENSSL From a86a2d150e9cf3e814a4c3ca02bf67ed1fab7b6f Mon Sep 17 00:00:00 2001 From: lxsang Date: Fri, 24 Jan 2020 16:52:44 +0100 Subject: [PATCH 2/5] cont. --- httpd.c | 5 -- lib/h2.c | 119 +++++++++++++++++++++++++++++------------------- lib/h2.h | 82 +++++++++++++++++++++++++++++++-- lib/handle.c | 5 -- lib/handle.h | 1 - lib/scheduler.c | 7 +++ lib/scheduler.h | 2 + 7 files changed, 160 insertions(+), 61 deletions(-) diff --git a/httpd.c b/httpd.c index 0deb04c..83b7920 100644 --- a/httpd.c +++ b/httpd.c @@ -140,11 +140,6 @@ void configure_context(SSL_CTX *ctx) #endif -void schedule_task(antd_task_t* task) -{ - antd_add_task(&scheduler, task); -} - void stop_serve(int dummy) { UNUSED(dummy); diff --git a/lib/h2.c b/lib/h2.c index e84f22a..ed1d49d 100644 --- a/lib/h2.c +++ b/lib/h2.c @@ -1,9 +1,57 @@ #include "h2.h" #include "scheduler.h" +static int antd_h2_read_frame_header(antd_client_t* cl, antd_h2_frame_header_t* frame) +{ + frame->length = 0; + frame->type = 0; + frame->flags = 0; + frame->identifier= 0; + uint8_t header[9]; + if( antd_recv(cl,& header,sizeof(header)) != sizeof(header)) return 0; + // network byte order is big endian + // read frame length + frame->length = (*header << 16) + (*(header + 1)<< 8) + *(header+2); + // frame type + frame->type = *(header + 3); + // frame flags + frame->flags = *(header + 4); + // frame identifier + frame->identifier = ((*(header + 5) & 0x7F) << 24) + (*(header + 6)<< 16) + (*(header + 7)<< 8) + *(header + 8); + + return 1; +} + + +static int process_frame(void* source, antd_h2_frame_header_t* frame_h) +{ + // verify frame + printf("Frame type: %d\n", frame_h->type & 0xff); + printf("Frame flag: %d\n",frame_h->flags); + printf("frame identifier: %d\n", frame_h->identifier); + uint8_t* frame_data = (uint8_t*)malloc(frame_h->length); + if(!frame_data) + { + return 0; + } + antd_request_t* rq = (antd_request_t*) source; + if(antd_recv(rq->client,frame_data,frame_h->length) != frame_h->length) + { + // TODO error + // go away + ERROR("Cannot read all frame data"); + free(frame_data); + return H2_NO_ERROR; + } + + free(frame_data); + return H2_NO_ERROR; +} + void* antd_h2_preface_ck(void* data) { char buf[25]; + antd_h2_frame_header_t frame_h; antd_request_t* rq = (antd_request_t*) data; int count = antd_recv(rq->client,buf,24); if(count != 24) @@ -11,7 +59,7 @@ void* antd_h2_preface_ck(void* data) // TODO servers MUST treat an invalid connection preface as a // connection error (Section 5.4.1) of type PROTOCOL_ERROR ERROR("Unable to read preface for client %d: [%s]",rq->client->sock,buf); - return antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + return antd_empty_task((void *)rq,rq->client->last_io); } buf[24] = '\0'; if(strcmp(buf, H2_CONN_PREFACE) != 0) @@ -19,8 +67,19 @@ void* antd_h2_preface_ck(void* data) ERROR("Connection preface is not correct for client %d: [%s]",rq->client->sock,buf); // TODO servers MUST treat an invalid connection preface as a // connection error (Section 5.4.1) of type PROTOCOL_ERROR - return antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + return antd_empty_task((void *)rq, rq->client->last_io); } + // read the setting frame + if(!antd_h2_read_frame_header(rq->client, &frame_h) || frame_h.type != H2_FRM_SETTINGS) + { + // TODO: frame error + // + // send go away with PROTOCOL_ERROR + printf("error reading setting frame\n"); + ERROR("Unable to read setting frame from client %d",rq->client->sock); + return antd_empty_task((void *)rq, rq->client->last_io); + } + process_frame(rq, &frame_h); return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); } @@ -30,15 +89,11 @@ void* antd_h2_handle(void* data) antd_task_t* task; if(rq->client->flags & CLIENT_FL_READABLE) { - task = antd_create_task(antd_h2_read,(void *)rq, NULL, rq->client->last_io); - task->priority++; - schedule_task(task); + antd_h2_read(data); } if(rq->client->flags & CLIENT_FL_WRITABLE) { - task = antd_create_task(antd_h2_write,(void *)rq, NULL, rq->client->last_io); - task->priority++; - schedule_task(task); + antd_h2_write(data); } task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); @@ -46,56 +101,26 @@ void* antd_h2_handle(void* data) return task; } -static int antd_h2_read_frame(antd_client_t* cl, antd_h2_frame_t* frame) -{ - uint8_t tmp; - frame->length = 0; - frame->type = 0; - frame->flags = 0; - frame->identifier= 0; - if( antd_recv(cl,& frame->length,24) != 24) return 0; - printf("length is %d\n", frame->length); - // TODO: - // Values greater than 2^14 (16,384) MUST NOT be - // sent unless the receiver has set a larger value for - // SETTINGS_MAX_FRAME_SIZE. - if( antd_recv(cl,& frame->type,8) != 8) return 0; - printf("type is %d\n", frame->type); - if( antd_recv(cl,& frame->flags,8) != 8) return 0; - if( antd_recv(cl,& tmp,1) != 1) return 0; - // identifier - if( antd_recv(cl,& frame->identifier,31) != 31) return 0; - frame->data = (uint8_t*) malloc(frame->length); - if(!frame->data) return 0; - if( antd_recv(cl,frame->data, frame->length) != frame->length) - { - free(frame->data); - return 0; - } - return 1; -} + void* antd_h2_read(void* data) { - antd_h2_frame_t frame; + antd_h2_frame_header_t frame_h; antd_request_t* rq = (antd_request_t*) data; - antd_task_t* task; - if(!antd_h2_read_frame(rq->client, &frame)) + if(!antd_h2_read_frame_header(rq->client, &frame_h)) { // TODO: frame error - printf("error reading frame\n"); + // send goaway frame ERROR("Unable to read frame from client %d",rq->client->sock); - task = antd_create_task(NULL, (void *)rq, NULL, time(NULL)); - task->priority++; - return task; + return antd_empty_task(data, rq->client->last_io); } - // verify frame - printf("Frame type: %d\n", frame.type & 0xff); - return antd_create_task(NULL, data, NULL, time(NULL)); + process_frame(data, &frame_h); + return antd_empty_task(data, rq->client->last_io); } void* antd_h2_write(void* data) { + antd_request_t* rq = (antd_request_t*) data; printf("write task\n"); - return antd_create_task(NULL, data, NULL, time(NULL)); + return antd_empty_task(data, rq->client->last_io); } \ No newline at end of file diff --git a/lib/h2.h b/lib/h2.h index f169897..0a9edf7 100644 --- a/lib/h2.h +++ b/lib/h2.h @@ -16,6 +16,81 @@ #define H2_FRM_WINDOW_UPDATE 0x8 #define H2_FRM_CONTINUATION 0x9 +// ERROR code +/* +The associated condition is not a result of an +error. For example, a GOAWAY might include this code to indicate +graceful shutdown of a connection. +*/ +#define H2_NO_ERROR 0x0 +/* +The endpoint detected an unspecific protocol +error. This error is for use when a more specific error code is +not available. +*/ +#define H2_PROTOCOL_ERROR 0x1 +/* +The endpoint encountered an unexpected +internal error. +*/ +#define H2_INTERNAL_ERROR 0x2 +/* +The endpoint detected that its peer +violated the flow-control protocol. +*/ +#define H2_FLOW_CONTROL_ERROR 0x3 +/* +The endpoint sent a SETTINGS frame but did +not receive a response in a timely manner. See Section 6.5.3 +("Settings Synchronization"). +*/ +#define H2_SETTINGS_TIMEOUT 0x4 +/* +The endpoint received a frame after a stream +was half-closed. +*/ +#define H2_STREAM_CLOSED 0x5 +/* +The endpoint received a frame with an invalid size. +*/ +#define H2_FRAME_SIZE_ERROR 0x6 +/* +The endpoint refused the stream prior to +performing any application processing (see Section 8.1.4 for +details). +*/ +#define H2_REFUSED_STREAM 0x7 +/* +Used by the endpoint to indicate that the stream is no +longer needed. +*/ +#define H2_CANCEL 0x8 +/* +The endpoint is unable to maintain the +header compression context for the connection. +*/ +#define H2_COMPRESSION_ERROR 0x9 +/* +The connection established in response to a CONNECT request (Section 8.3) was reset or abnormally closed. +*/ +#define H2_CONNECT_ERROR 0xa +/* +The endpoint detected that its peer is +exhibiting a behavior that might be generating excessive load. +*/ +#define H2_ENHANCE_YOUR_CALM 0xb +/* +The underlying transport has properties +that do not meet minimum security requirements (see Section 9.2). +*/ +#define H2_INADEQUATE_SECURITY 0xc +/* +The endpoint requires that HTTP/1.1 be used +instead of HTTP/2. +*/ +#define H2_HTTP_1_1_REQUIRED 0xd + + /** * Struct that holds a * h2 connection @@ -33,7 +108,7 @@ typedef struct { } antd_h2_stream_t; /** - * a H2 frame + * a H2 frame header */ typedef struct { // 24 bits length @@ -44,8 +119,9 @@ typedef struct { uint8_t flags; // 31 bits identifier unsigned int identifier; - uint8_t* data; -} antd_h2_frame_t; +} antd_h2_frame_header_t; + + void* antd_h2_read(void* rq); void* antd_h2_write(void* rq); diff --git a/lib/handle.c b/lib/handle.c index 048e454..6ceb10e 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -83,11 +83,6 @@ int compressable(char* ctype) return 0; } -void schedule_task(antd_task_t* task) -{ - UNUSED(task); -} - void htdocs(antd_request_t* rq, char* dest) { dictionary_t xheader = (dictionary_t)dvalue(rq->request, "REQUEST_HEADER"); diff --git a/lib/handle.h b/lib/handle.h index 75a9f79..584da97 100644 --- a/lib/handle.h +++ b/lib/handle.h @@ -122,7 +122,6 @@ void __attribute__((weak)) dbdir(char* dest); void __attribute__((weak)) tmpdir(char* dest); void __attribute__((weak)) plugindir(char* dest); int __attribute__((weak)) compressable(char* ctype); -void __attribute__((weak)) schedule_task(antd_task_t* task); void set_nonblock(int socket); //void set_block(int socket); diff --git a/lib/scheduler.c b/lib/scheduler.c index e685a59..9fdcd31 100644 --- a/lib/scheduler.c +++ b/lib/scheduler.c @@ -379,3 +379,10 @@ void antd_scheduler_wait(antd_scheduler_t* scheduler) } } } + +antd_task_t* antd_empty_task(void* data,time_t t) +{ + antd_task_t* task = antd_create_task(NULL,data,NULL,t); + task->priority++; + return task; +} \ No newline at end of file diff --git a/lib/scheduler.h b/lib/scheduler.h index baa4ea0..781df38 100644 --- a/lib/scheduler.h +++ b/lib/scheduler.h @@ -115,6 +115,8 @@ void antd_scheduler_destroy(antd_scheduler_t *); */ antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t); +antd_task_t* antd_empty_task(void* data, time_t); + /* add a task */ From 894f6d0f312f249161fd6700fe31f59787d5b7da Mon Sep 17 00:00:00 2001 From: lxsang Date: Tue, 28 Jan 2020 18:02:09 +0100 Subject: [PATCH 3/5] cont --- Makefile.am | 4 +- http_server.c | 54 +++++-- http_server.h | 1 + httpd.c | 2 +- lib/dictionary.c | 2 - lib/h2.c | 369 ++++++++++++++++++++++++++++++++++++++++++++--- lib/h2.h | 64 +++++++- lib/handle.c | 33 ----- lib/handle.h | 1 - lib/queue.c | 84 +++++++++++ lib/queue.h | 20 +++ 11 files changed, 562 insertions(+), 72 deletions(-) create mode 100644 lib/queue.c create mode 100644 lib/queue.h diff --git a/Makefile.am b/Makefile.am index 2dbe5e5..94e92f7 100644 --- a/Makefile.am +++ b/Makefile.am @@ -21,6 +21,7 @@ libantd_la_SOURCES = lib/ini.c \ lib/ws.c \ lib/sha1.c \ lib/list.c \ + lib/queue.c \ lib/scheduler.c \ lib/h2.c @@ -34,7 +35,8 @@ pkginclude_HEADERS = lib/ini.h \ lib/list.h \ lib/scheduler.h \ lib/plugin.h \ - lib/h2.h + lib/h2.h \ + lib/queue.h EXTRA_DIST = plugin_manager.h http_server.h README.md LICENSE antd-config.ini diff --git a/http_server.c b/http_server.c index 1df5013..e3cb1c0 100644 --- a/http_server.c +++ b/http_server.c @@ -392,17 +392,15 @@ void *accept_request(void *data) return task; } } -#endif - //printf("Flag: %d\n", client->flags); - // now return the task base on the http version - if(client->flags & CLIENT_FL_HTTP_1_1) - { - task->handle = decode_request_header; - } - else +#if OPENSSL_VERSION_NUMBER >= 0x10002000L + if(!(client->flags & CLIENT_FL_HTTP_1_1)) { task->handle = antd_h2_preface_ck; + return task; } +#endif +#endif + task->handle = decode_request_header; return task; } @@ -1398,4 +1396,42 @@ int compressable(char* ctype) } return 0; } -#endif \ No newline at end of file +#endif + +void destroy_request(void *data) +{ + if (!data) + return; + antd_request_t *rq = (antd_request_t *)data; + //LOG("Close request %d", rq->client->sock); + // free all other thing + if (rq->request) + { + dictionary_t tmp = dvalue(rq->request, "COOKIE"); + if (tmp) + freedict(tmp); + tmp = dvalue(rq->request, "REQUEST_HEADER"); + if (tmp) + freedict(tmp); + tmp = dvalue(rq->request, "REQUEST_DATA"); + if (tmp) + freedict(tmp); + dput(rq->request, "REQUEST_HEADER", NULL); + dput(rq->request, "REQUEST_DATA", NULL); + dput(rq->request, "COOKIE", NULL); +#ifdef USE_OPENSSL +#if OPENSSL_VERSION_NUMBER >= 0x10002000L + antd_h2_conn_t* conn = H2_CONN(data); + if(conn) + { + //H2_CONNECTION + antd_h2_close_conn(conn); + dput(rq->request, "H2_CONNECTION", NULL); + } +#endif +#endif + freedict(rq->request); + } + antd_close(rq->client); + free(rq); +} \ No newline at end of file diff --git a/http_server.h b/http_server.h index 183dfa9..699cad5 100644 --- a/http_server.h +++ b/http_server.h @@ -40,5 +40,6 @@ void* decode_multi_part_request_data(void* data); void decode_cookie(const char*, dictionary_t d); char* post_data_decode(void*,int); void* execute_plugin(void* data, const char *path); +void destroy_request(void *data); #endif \ No newline at end of file diff --git a/httpd.c b/httpd.c index 83b7920..188a0bc 100644 --- a/httpd.c +++ b/httpd.c @@ -284,7 +284,7 @@ int main(int argc, char* argv[]) pthread_t scheduler_th; if (pthread_create(&scheduler_th, NULL,(void *(*)(void *))antd_scheduler_wait, (void*)&scheduler) != 0) { - ERROR("pthread_create: cannot create worker"); + ERROR("pthread_create: cannot create scheduler thread"); stop_serve(0); exit(1); } diff --git a/lib/dictionary.c b/lib/dictionary.c index 3a389a6..60591b5 100644 --- a/lib/dictionary.c +++ b/lib/dictionary.c @@ -118,8 +118,6 @@ chain_t insert(dictionary_t dic,const char* key, void* value, antd_dict_item_typ return np; } - - chain_t dremove(dictionary_t dic, const char* key) { if(dic->map == NULL) return 0; diff --git a/lib/h2.c b/lib/h2.c index ed1d49d..d73268d 100644 --- a/lib/h2.c +++ b/lib/h2.c @@ -1,6 +1,13 @@ + +#include #include "h2.h" #include "scheduler.h" +struct antd_h2_stream_list_t{ + struct antd_h2_stream_list_t* next; + antd_h2_stream_t* stream; +}; + static int antd_h2_read_frame_header(antd_client_t* cl, antd_h2_frame_header_t* frame) { frame->length = 0; @@ -11,43 +18,191 @@ static int antd_h2_read_frame_header(antd_client_t* cl, antd_h2_frame_header_t* if( antd_recv(cl,& header,sizeof(header)) != sizeof(header)) return 0; // network byte order is big endian // read frame length - frame->length = (*header << 16) + (*(header + 1)<< 8) + *(header+2); + memcpy( ((uint8_t*)(&frame->length)) + 1,header,3); + frame->length = ntohl(frame->length); // frame type frame->type = *(header + 3); // frame flags frame->flags = *(header + 4); - // frame identifier - frame->identifier = ((*(header + 5) & 0x7F) << 24) + (*(header + 6)<< 16) + (*(header + 7)<< 8) + *(header + 8); - + memcpy(&frame->identifier,header+5,4); + frame->identifier = ntohl(frame->identifier) & 0x7FFFFFFF; return 1; } - -static int process_frame(void* source, antd_h2_frame_header_t* frame_h) +static int process_setting_frame(antd_request_t* rq, antd_h2_frame_header_t* frame_h) { - // verify frame - printf("Frame type: %d\n", frame_h->type & 0xff); - printf("Frame flag: %d\n",frame_h->flags); - printf("frame identifier: %d\n", frame_h->identifier); + if(frame_h->length == 0) + return H2_NO_ERROR; + if(frame_h->flags & H2_SETTING_ACK_FLG) + { + return H2_FRAME_SIZE_ERROR; + } + if(frame_h->identifier != 0) + { + return H2_PROTOCOL_ERROR; + } + if(frame_h->length % 6 != 0) + { + return H2_FRAME_SIZE_ERROR; + } + uint8_t* frame_data = (uint8_t*)malloc(frame_h->length); if(!frame_data) { - return 0; + return H2_INTERNAL_ERROR; } - antd_request_t* rq = (antd_request_t*) source; - if(antd_recv(rq->client,frame_data,frame_h->length) != frame_h->length) + antd_h2_conn_t* conn = (antd_h2_conn_t*) dvalue(rq->request,"H2_CONNECTION"); + if(!conn) + { + return H2_INTERNAL_ERROR; + } + if(antd_recv(rq->client,frame_data,frame_h->length) != (int)frame_h->length) { - // TODO error - // go away ERROR("Cannot read all frame data"); free(frame_data); - return H2_NO_ERROR; + return H2_PROTOCOL_ERROR; } + // read each identifier + uint16_t param_id; + int param_val; + uint8_t* ptr; + for (size_t i = 0; i < frame_h->length / 6 ; i++) + { + ptr = frame_data + i*6; + memcpy(¶m_id,ptr,2); + param_id = ntohs(param_id); + ptr += 2; + memcpy(¶m_val,ptr,4); + param_val = ntohl(param_val); + printf("id: %d val: %d\n", param_id, param_val); + switch (param_id) + { + case H2_SETTINGS_HEADER_TABLE_SIZE: + conn->settings.header_table_sz = param_val; + break; + + case H2_SETTINGS_ENABLE_PUSH: + if(param_val != 0 || param_val != 1) + { + free(frame_data); + return H2_PROTOCOL_ERROR; + } + conn->settings.enable_push = param_val; + break; + + case H2_SETTINGS_MAX_CONCURRENT_STREAMS: + conn->settings.max_concurrent_streams = param_val; + break; + case H2_SETTINGS_INITIAL_WINDOW_SIZE: + if(param_val > 2147483647)// 2^31-1 + { + free(frame_data); + return H2_FLOW_CONTROL_ERROR; + } + int offset = param_val - conn->settings.init_win_sz; + conn->settings.init_win_sz = param_val; + // this should applied only to streams with active flow-control windows + antd_h2_update_streams_win_sz(conn->streams[0], offset); + /* + In addition to changing the flow-control window for streams that are + not yet active, a SETTINGS frame can alter the initial flow-control + window size for streams with active flow-control windows (that is, + streams in the "open" or "half-closed (remote)" state). When the + value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust + the size of all stream flow-control windows that it maintains by the + difference between the new value and the old value. + */ + break; + case H2_SETTINGS_MAX_FRAME_SIZE: + if(param_val < 16384 || param_val > 16777215) // < 2^14 or > 2^24-1 + { + free(frame_data); + return H2_PROTOCOL_ERROR; + } + conn->settings.max_frame_sz = param_val; + break; + case H2_SETTINGS_MAX_HEADER_LIST_SIZE: + conn->settings.max_header_list_sz = param_val; + break; + default: + free(frame_data); + return H2_IGNORED; + } + } free(frame_data); + // send back ack setting frame return H2_NO_ERROR; } +static int process_window_update_frame(antd_request_t* rq, antd_h2_frame_header_t* frame_h) +{ + int window_size_incr; + if(antd_recv(rq->client,&window_size_incr, 4) != 4) + { + return H2_PROTOCOL_ERROR; + } + window_size_incr = ntohl(window_size_incr) & 0x7FFFFFFF; + if(window_size_incr <1 || window_size_incr > 2147483647) + return H2_PROTOCOL_ERROR; + antd_h2_conn_t* conn = H2_CONN(rq); + if(!conn) + return H2_INTERNAL_ERROR; + if(frame_h->identifier == 0) + { + conn->win_sz += window_size_incr; + } + else + { + antd_h2_stream_t* stream = antd_h2_get_stream(conn->streams,frame_h->identifier); + if(!stream) + return H2_INTERNAL_ERROR; + stream->win_sz += window_size_incr; + } + return H2_NO_ERROR; +} + +static int process_frame(void* source, antd_h2_frame_header_t* frame_h) +{ + int stat; + switch (frame_h->type) + { + case H2_FRM_SETTINGS: + stat = process_setting_frame(source, frame_h); + break; + case H2_FRM_WINDOW_UPDATE: + stat = process_window_update_frame(source,frame_h); + break; + default: + printf("Frame: %d, length: %d id: %d\n", frame_h->type, frame_h->length, frame_h->identifier); + stat = H2_IGNORED; + } + if(stat == H2_NO_ERROR || stat == H2_IGNORED) + return stat; + + antd_h2_conn_t* conn = H2_CONN(source); + if(conn) + { + // send error frame + antd_h2_frame_header_t header; + header.identifier = frame_h->identifier; + //header.type = stat; + header.flags = 0; + header.length = 8; + uint8_t error_body[8]; + int tmp = htonl(conn->last_stream_id); + memcpy(error_body, &tmp , 4 ); + tmp = htonl(stat); + memcpy(error_body + 4, &tmp ,4); + header.type = H2_FRM_RST_STREAM; + if(frame_h->identifier == 0) + header.type = H2_FRM_GOAWAY; + antd_h2_send_frame(source,&header,error_body); + } + return stat; + +} + void* antd_h2_preface_ck(void* data) { char buf[25]; @@ -79,8 +234,34 @@ void* antd_h2_preface_ck(void* data) ERROR("Unable to read setting frame from client %d",rq->client->sock); return antd_empty_task((void *)rq, rq->client->last_io); } - process_frame(rq, &frame_h); - return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); + // create a connection + dput(rq->request,"H2_CONNECTION",antd_h2_open_conn()); + int stat = process_frame(rq, &frame_h); + if( stat == H2_NO_ERROR || stat == H2_IGNORED) + { + //TODO: send back setting frame + // and init the conn + antd_h2_frame_header_t header; + header.length = 0; + header.type = H2_FRM_SETTINGS; + header.flags = 0x1; + header.identifier = 0; + if(antd_h2_send_frame(rq, &header,NULL)) + { + printf("frame sent\n"); + return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); + } + else + { + printf("cannot send frame\n"); + return antd_empty_task(data, rq->client->last_io); + } + } + else + { + printf("Error process frame %d\n", stat); + return antd_empty_task(data, rq->client->last_io); + } } void* antd_h2_handle(void* data) @@ -96,7 +277,7 @@ void* antd_h2_handle(void* data) antd_h2_write(data); } - task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + task = antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); task->priority++; return task; } @@ -121,6 +302,154 @@ void* antd_h2_read(void* data) void* antd_h2_write(void* data) { antd_request_t* rq = (antd_request_t*) data; - printf("write task\n"); + //printf("write task\n"); return antd_empty_task(data, rq->client->last_io); +} + +antd_h2_conn_t* antd_h2_open_conn() +{ + antd_h2_conn_t* conn = (antd_h2_conn_t*) malloc(sizeof(conn)); + if(! conn) + return NULL; + + conn->settings.header_table_sz = 4096; + conn->settings.enable_push = 1; + conn->settings.max_concurrent_streams = 100; + conn->settings.init_win_sz = 65535; + conn->settings.max_frame_sz = 16384; + conn->settings.max_header_list_sz = 0; //unlimited + conn->win_sz = conn->settings.init_win_sz; + conn->last_stream_id = 0; + conn->streams = (antd_h2_stream_list_t*) malloc(2*sizeof(antd_h2_stream_list_t)); + + return conn; +} + + +void antd_h2_close_conn(antd_h2_conn_t* conn) +{ + if(! conn) + return; + + if(conn->streams) + { + antd_h2_close_all_streams(conn->streams[0]); + antd_h2_close_all_streams(conn->streams[1]); + free(conn->streams); + } + free(conn); +} + +int antd_h2_send_frame(antd_request_t* rq, antd_h2_frame_header_t* fr_header, uint8_t* data) +{ + // send the frame header in network bytes order + uint8_t header[9]; + int nbo_int = htonl(fr_header->length) >> 8; + memcpy(header,&nbo_int,3); + // type + *(header+3) = fr_header->type; + // flag + *(header+4) = fr_header->flags; + // identifier + nbo_int = htonl(fr_header->identifier); + memcpy(header+5, &nbo_int,4); + if(antd_send(rq->client,header,sizeof(header)) != sizeof(header)) + { + return 0; + } + if(fr_header->length == 0) + { + return 1; + } + if(data == NULL) + { + return 0; + } + // send data + if(antd_send(rq->client,data,fr_header->length) !=(int)fr_header->length) + { + return 0; + } + return 1; +} + +/*stream utilities functions*/ +void antd_h2_add_stream(antd_h2_stream_list_t* streams, antd_h2_stream_t* stream) +{ + if(!stream || !streams) return; + int idx = stream->id % 2; + antd_h2_stream_list_t it = (antd_h2_stream_list_t) malloc(sizeof(it)); + it->next = streams[idx]; + it->stream = stream; + streams[idx] = it; +} +antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t* streams, int id) +{ + int idx = id % 2; + antd_h2_stream_list_t it; + + for(it = streams[idx]; it != NULL; it = it->next) + { + if(id == it->stream->id) + return it->stream; + } + return NULL; +} +void antd_h2_close_stream(antd_h2_stream_t* stream) +{ + if(!stream) return; + if(stream->stdin) free(stream->stdin); + if(stream->stdout) free(stream->stdout); + free(stream); +} + +void antd_h2_close_all_streams(antd_h2_stream_list_t streams) +{ + antd_h2_stream_list_t it; + for(it = streams; it != NULL; it = it->next) + { + if(it->stream) + { + antd_h2_close_stream(it->stream); + } + free(it); + } +} +void antd_h2_update_streams_win_sz(antd_h2_stream_list_t streams, int offset) +{ + antd_h2_stream_list_t it; + for(it = streams; it != NULL; it = it->next) + { + if(it->stream) + { + it->stream->win_sz += offset; + } + } +} + +void antd_h2_del_stream(antd_h2_stream_list_t* streams, int id) +{ + int idx = id % 2; + antd_h2_stream_list_t it; + if(streams[idx] && streams[idx]->stream->id == id) + { + it = streams[idx]; + streams[idx] = it->next; + antd_h2_close_stream(it->stream); + free(it); + return; + } + for(it = streams[idx]; it != NULL; it = it->next) + { + if(it->next!= NULL && id == it->next->stream->id) + { + antd_h2_stream_list_t np; + np = it->next; + it->next = np->next; + np->next = NULL; + antd_h2_close_stream(np->stream); + free(np); + return; + } + } } \ No newline at end of file diff --git a/lib/h2.h b/lib/h2.h index 0a9edf7..0e5699e 100644 --- a/lib/h2.h +++ b/lib/h2.h @@ -1,7 +1,9 @@ #ifndef HTTP2_H #define HTTP2_H + #include "handle.h" #include "hpack.h" +#include "queue.h" #define H2_CONN_PREFACE "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" @@ -90,21 +92,65 @@ instead of HTTP/2. */ #define H2_HTTP_1_1_REQUIRED 0xd +/* +The frame should be ignore +*/ +#define H2_IGNORED 0xe + +/* +SETTING FRAME CONSTs +*/ +/* +When set, bit 0 indicates that this frame acknowledges +receipt and application of the peer's SETTINGS frame. When this +bit is set, the payload of the SETTINGS frame MUST be empty. +Receipt of a SETTINGS frame with the ACK flag set and a length +field value other than 0 MUST be treated as a connection error +(Section 5.4.1) of type FRAME_SIZE_ERROR +*/ +#define H2_SETTING_ACK_FLG 0x1 +#define H2_SETTINGS_HEADER_TABLE_SIZE 0x1 +#define H2_SETTINGS_ENABLE_PUSH 0x2 +#define H2_SETTINGS_MAX_CONCURRENT_STREAMS 0x3 +#define H2_SETTINGS_INITIAL_WINDOW_SIZE 0x4 +#define H2_SETTINGS_MAX_FRAME_SIZE 0x5 +#define H2_SETTINGS_MAX_HEADER_LIST_SIZE 0x6 + + +typedef struct{ + uint32_t header_table_sz; + uint32_t enable_push; + uint32_t max_concurrent_streams; + uint32_t init_win_sz; + uint32_t max_frame_sz; + uint32_t max_header_list_sz; +} antd_h2_conn_setting_t; + + +typedef struct antd_h2_stream_list_t* antd_h2_stream_list_t; /** * Struct that holds a * h2 connection */ typedef struct { - + antd_h2_conn_setting_t settings; + antd_h2_stream_list_t* streams; + int win_sz; + int last_stream_id; } antd_h2_conn_t; +#define H2_CONN(rq) ((antd_h2_conn_t*)dvalue(((antd_request_t*)rq)->request,"H2_CONNECTION")) +#define H2_SETTING(rq) (H2_CON(rq)->settings) /** * Struct that holds a * h2 stream */ typedef struct { - + struct queue_root* stdin; + struct queue_root* stdout; + int win_sz; + int id; } antd_h2_stream_t; /** @@ -121,13 +167,21 @@ typedef struct { unsigned int identifier; } antd_h2_frame_header_t; +/*stream utilities functions*/ +void antd_h2_close_stream(antd_h2_stream_t* stream); +void antd_h2_add_stream(antd_h2_stream_list_t*, antd_h2_stream_t*); +antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t*, int); +void antd_h2_del_stream(antd_h2_stream_list_t*, int); +void antd_h2_close_all_streams(antd_h2_stream_list_t); +void antd_h2_update_streams_win_sz(antd_h2_stream_list_t streams, int offset); +/*Connection utilities funtions*/ +antd_h2_conn_t* antd_h2_open_conn(); +void antd_h2_close_conn(antd_h2_conn_t*); void* antd_h2_read(void* rq); void* antd_h2_write(void* rq); - void* antd_h2_preface_ck(void* rq); - void* antd_h2_handle(void* rq); - +int antd_h2_send_frame(antd_request_t*, antd_h2_frame_header_t*, uint8_t*); #endif \ No newline at end of file diff --git a/lib/handle.c b/lib/handle.c index 6ceb10e..4739038 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -1,5 +1,4 @@ #include "handle.h" - #define HTML_TPL "%s

%s

" static const char* S_100 = "Continue"; @@ -849,36 +848,4 @@ int read_buf(void* sock, char*buf,int size) } buf[i] = '\0'; return i; -} -/* - We put it here since we want the plugin is able - to destroy the request if it want to - in this case, the plugin should return an empty - with no data -*/ -void destroy_request(void *data) -{ - if (!data) - return; - antd_request_t *rq = (antd_request_t *)data; - //LOG("Close request %d", rq->client->sock); - // free all other thing - if (rq->request) - { - dictionary_t tmp = dvalue(rq->request, "COOKIE"); - if (tmp) - freedict(tmp); - tmp = dvalue(rq->request, "REQUEST_HEADER"); - if (tmp) - freedict(tmp); - tmp = dvalue(rq->request, "REQUEST_DATA"); - if (tmp) - freedict(tmp); - dput(rq->request, "REQUEST_HEADER", NULL); - dput(rq->request, "REQUEST_DATA", NULL); - dput(rq->request, "COOKIE", NULL); - freedict(rq->request); - } - antd_close(rq->client); - free(rq); } \ No newline at end of file diff --git a/lib/handle.h b/lib/handle.h index 584da97..9e547ec 100644 --- a/lib/handle.h +++ b/lib/handle.h @@ -142,5 +142,4 @@ int read_buf(void* sock,char* buf,int i); int antd_send( void *source, const void* data, int len); int antd_recv( void *source, void* data, int len); int antd_close(void* source); -void destroy_request(void *data); #endif diff --git a/lib/queue.c b/lib/queue.c new file mode 100644 index 0000000..11a6865 --- /dev/null +++ b/lib/queue.c @@ -0,0 +1,84 @@ +// code base on: https://github.com/majek/dump/blob/master/msqueue/queue_semiblocking.c +#include +#include +#include + +#include "queue.h" + +#define QUEUE_POISON1 ((void*)0xCAFEBAB5) + +struct queue_root { + struct queue_head *in_queue; + struct queue_head *out_queue; +// pthread_spinlock_t lock; + pthread_mutex_t lock; +}; + + +#ifndef _cas +# define _cas(ptr, oldval, newval) \ + __sync_bool_compare_and_swap(ptr, oldval, newval) +#endif + +struct queue_root *ALLOC_QUEUE_ROOT() +{ + struct queue_root *root = \ + malloc(sizeof(struct queue_root)); + +// pthread_spin_init(&root->lock, PTHREAD_PROCESS_PRIVATE); + pthread_mutex_init(&root->lock, NULL); + + root->in_queue = NULL; + root->out_queue = NULL; + return root; +} + +void INIT_QUEUE_HEAD(struct queue_head *head) +{ + head->next = QUEUE_POISON1; +} + +void queue_put(struct queue_head *new, + struct queue_root *root) +{ + while (1) { + struct queue_head *in_queue = root->in_queue; + new->next = in_queue; + if (_cas(&root->in_queue, in_queue, new)) { + break; + } + } +} + +struct queue_head *queue_get(struct queue_root *root) +{ +// pthread_spin_lock(&root->lock); + pthread_mutex_lock(&root->lock); + + if (!root->out_queue) { + while (1) { + struct queue_head *head = root->in_queue; + if (!head) { + break; + } + if (_cas(&root->in_queue, head, NULL)) { + // Reverse the order + while (head) { + struct queue_head *next = head->next; + head->next = root->out_queue; + root->out_queue = head; + head = next; + } + break; + } + } + } + + struct queue_head *head = root->out_queue; + if (head) { + root->out_queue = head->next; + } +// pthread_spin_unlock(&root->lock); + pthread_mutex_unlock(&root->lock); + return head; +} \ No newline at end of file diff --git a/lib/queue.h b/lib/queue.h new file mode 100644 index 0000000..f920d7c --- /dev/null +++ b/lib/queue.h @@ -0,0 +1,20 @@ +// code based on : https://github.com/majek/dump/blob/master/msqueue/queue.h + +#ifndef QUEUE_H +#define QUEUE_H + +struct queue_root; + +struct queue_head { + void* data; + struct queue_head *next; +}; + +struct queue_root *ALLOC_QUEUE_ROOT(); +void INIT_QUEUE_HEAD(struct queue_head *head); + +void queue_put(struct queue_head *,struct queue_root *); + +struct queue_head *queue_get(struct queue_root *root); + +#endif // QUEUE_H From cbe7c80a8af28479e18ebd93df31c8fe12511ac7 Mon Sep 17 00:00:00 2001 From: lxsang Date: Wed, 29 Jan 2020 17:09:16 +0100 Subject: [PATCH 4/5] cont --- http_server.c | 16 +-- httpd.c | 32 +++--- lib/h2.c | 262 +++++++++++++++++++++++++++++++++++++++++--------- lib/h2.h | 35 ++++++- lib/handle.c | 39 ++++---- lib/handle.h | 6 +- lib/queue.c | 18 ++++ lib/queue.h | 3 +- lib/ws.c | 12 +-- 9 files changed, 325 insertions(+), 98 deletions(-) diff --git a/http_server.c b/http_server.c index e3cb1c0..b518961 100644 --- a/http_server.c +++ b/http_server.c @@ -336,20 +336,20 @@ void *accept_request(void *data) // first verify if the socket is ready antd_client_t *client = (antd_client_t *)rq->client; FD_ZERO(&read_flags); - FD_SET(rq->client->sock, &read_flags); + FD_SET(rq->client->id, &read_flags); FD_ZERO(&write_flags); - FD_SET(rq->client->sock, &write_flags); + FD_SET(rq->client->id, &write_flags); struct timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = 500; // select - int sel = select(client->sock + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); + int sel = select(client->id + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); if (sel == -1) { antd_error(rq->client, 400, "Bad request"); return task; } - if (sel == 0 || (!FD_ISSET(client->sock, &read_flags) && !FD_ISSET(client->sock, &write_flags))) + if (sel == 0 || (!FD_ISSET(client->id, &read_flags) && !FD_ISSET(client->id, &write_flags))) { task->handle = accept_request; return task; @@ -357,12 +357,12 @@ void *accept_request(void *data) // perform the ssl handshake if enabled #ifdef USE_OPENSSL int ret = -1, stat; - if (client->ssl && !(client->flags & CLIENT_FL_ACCEPTED) ) + if (client->stream && !(client->flags & CLIENT_FL_ACCEPTED) ) { //LOG("Atttempt %d\n", client->attempt); - if (SSL_accept((SSL *)client->ssl) == -1) + if (SSL_accept((SSL *)client->stream) == -1) { - stat = SSL_get_error((SSL *)client->ssl, ret); + stat = SSL_get_error((SSL *)client->stream, ret); switch (stat) { case SSL_ERROR_WANT_READ: @@ -385,7 +385,7 @@ void *accept_request(void *data) } else { - if (!FD_ISSET(client->sock, &read_flags)) + if (!FD_ISSET(client->id, &read_flags)) { client->flags |= CLIENT_FL_ACCEPTED; task->handle = accept_request; diff --git a/httpd.c b/httpd.c index 188a0bc..83f05fa 100644 --- a/httpd.c +++ b/httpd.c @@ -6,6 +6,12 @@ static antd_scheduler_t scheduler; + +void antd_schedule_task(antd_task_t* task) +{ + antd_add_task(&scheduler,task); +} + #ifdef USE_OPENSSL // define the cipher suit used @@ -184,15 +190,15 @@ static int is_task_ready(antd_task_t* task) fd_set read_flags, write_flags; struct timeval timeout; FD_ZERO(&read_flags); - FD_SET(rq->client->sock, &read_flags); + FD_SET(rq->client->id, &read_flags); FD_ZERO(&write_flags); - FD_SET(rq->client->sock, &write_flags); + FD_SET(rq->client->id, &write_flags); timeout.tv_sec = 0; timeout.tv_usec = 0; - int sel = select(rq->client->sock + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); - if(sel > 0 && (FD_ISSET(rq->client->sock, &read_flags)|| FD_ISSET(rq->client->sock, &write_flags))) + int sel = select(rq->client->id + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); + if(sel > 0 && (FD_ISSET(rq->client->id, &read_flags)|| FD_ISSET(rq->client->id, &write_flags))) { - if(FD_ISSET(rq->client->sock, &read_flags)) + if(FD_ISSET(rq->client->id, &read_flags)) { rq->client->flags |= CLIENT_FL_READABLE; } @@ -201,7 +207,7 @@ static int is_task_ready(antd_task_t* task) rq->client->flags &= ~CLIENT_FL_READABLE; } - if(FD_ISSET(rq->client->sock, &write_flags)) + if(FD_ISSET(rq->client->id, &write_flags)) { rq->client->flags |= CLIENT_FL_WRITABLE; } @@ -363,22 +369,22 @@ int main(int argc, char* argv[]) if (setsockopt (client_sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout,sizeof(timeout)) < 0) perror("setsockopt failed\n"); */ - client->sock = client_sock; + client->id = client_sock; time(&client->last_io); - client->ssl = NULL; + client->stream = NULL; // default selected protocol is http/1.1 client->flags = CLIENT_FL_HTTP_1_1; #ifdef USE_OPENSSL if(pcnf->usessl == 1) { - client->ssl = (void*)SSL_new(ctx); - if(!client->ssl) continue; - SSL_set_fd((SSL*)client->ssl, client->sock); + client->stream = (void*)SSL_new(ctx); + if(!client->stream) continue; + SSL_set_fd((SSL*)client->stream, client->id); // this can be used in the protocol select callback to // set the protocol selected by the server - if(!SSL_set_ex_data((SSL*)client->ssl, client->sock, client)) + if(!SSL_set_ex_data((SSL*)client->stream, client->id, client)) { - ERROR("Cannot set ex data to ssl client:%d", client->sock); + ERROR("Cannot set ex data to ssl client:%d", client->id); } } #endif diff --git a/lib/h2.c b/lib/h2.c index d73268d..23bccd8 100644 --- a/lib/h2.c +++ b/lib/h2.c @@ -75,7 +75,7 @@ static int process_setting_frame(antd_request_t* rq, antd_h2_frame_header_t* fra ptr += 2; memcpy(¶m_val,ptr,4); param_val = ntohl(param_val); - printf("id: %d val: %d\n", param_id, param_val); + //printf("id: %d val: %d\n", param_id, param_val); switch (param_id) { case H2_SETTINGS_HEADER_TABLE_SIZE: @@ -162,6 +162,160 @@ static int process_window_update_frame(antd_request_t* rq, antd_h2_frame_header_ return H2_NO_ERROR; } +static void antd_h2_error(void* source,int stream_id, int error_code) +{ + // send error frame + antd_h2_conn_t* conn = H2_CONN(source); + if(!conn) return; + antd_h2_frame_header_t header; + header.identifier = stream_id; + //header.type = stat; + header.flags = 0; + header.length = 8; + uint8_t error_body[8]; + int tmp = htonl(conn->last_stream_id); + memcpy(error_body, &tmp , 4 ); + tmp = htonl(error_code); + memcpy(error_body + 4, &tmp ,4); + header.type = H2_FRM_RST_STREAM; + if(stream_id == 0) + header.type = H2_FRM_GOAWAY; + antd_h2_send_frame(source,&header,error_body); +} + +antd_h2_stream_t* antd_h2_init_stream(int id, int wsz) +{ + antd_h2_stream_t* stream = (antd_h2_stream_t*) malloc(sizeof(antd_h2_stream_t)); + if(!stream) return NULL; + stream->id = id; + stream->win_sz = wsz; + stream->state = H2_STR_IDLE; + stream->stdin = ALLOC_QUEUE_ROOT(); + stream->stdout = ALLOC_QUEUE_ROOT(); + //stream->flags = 0; + stream->dependency = 0; + stream->weight = 255; + return stream; +} + +antd_request_t* antd_h2_request_init(antd_request_t* rq, antd_h2_stream_t* stream) +{ + antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); + antd_request_t* h2rq = (antd_request_t*)malloc(sizeof(*h2rq)); + h2rq->client = client; + h2rq->request = dict(); + client->zstream = NULL; + client->z_level = ANTD_CNONE; + + dictionary_t h2xheader = dict(); + dictionary_t xheader = (dictionary_t)dvalue(rq->request,"REQUEST_HEADER"); + + dput(h2rq->request, "REQUEST_HEADER", h2xheader); + dput(h2rq->request, "REQUEST_DATA", dict()); + dput_static(h2xheader, "SERVER_PORT", dvalue(xheader,"SERVER_PORT")); + dput_static(h2xheader, "SERVER_WWW_ROOT", dvalue(xheader,"SERVER_WWW_ROOT")); + dput_static(h2xheader, "REMOTE_ADDR", dvalue(xheader,"REMOTE_ADDR")); + client->id = stream->id; + time(&client->last_io); + client->stream = stream; + client->flags = CLIENT_FL_ACCEPTED | CLIENT_FL_H2_STREAM; + return h2rq; +} + +static void* antd_h2_stream_handle(void* data) +{ + antd_request_t* rq = (antd_request_t*) data; + antd_h2_stream_t* stream = (antd_h2_stream_t*) rq->client->stream; + // transition state here + // TODO: next day + return NULL; +} + + +static int process_header_frame(antd_request_t* rq, antd_h2_frame_header_t* frame_h) +{ + if(frame_h->length == 0 || frame_h->identifier == 0) + { + return H2_PROTOCOL_ERROR; + } + uint8_t* data = (uint8_t*) malloc(frame_h->length); + if(!data) + { + return H2_INTERNAL_ERROR; + } + if(antd_recv(rq->client,data,frame_h->length) != (int)frame_h->length) + { + free(data); + return H2_PROTOCOL_ERROR; + } + // now parse the stream + antd_h2_conn_t* conn = H2_CONN(rq); + if(!conn) + { + free(data); + return H2_INTERNAL_ERROR; + } + int is_new_stream = 0; + antd_h2_stream_t* stream = antd_h2_get_stream(conn->streams,frame_h->identifier); + if(stream == NULL) + { + stream = antd_h2_init_stream(frame_h->identifier,conn->settings.init_win_sz); + antd_h2_add_stream(conn->streams,stream); + is_new_stream = 1; + } + if( stream->state == H2_STR_IDLE || stream->state == H2_STR_REV_LOC || stream->state == H2_STR_OPEN || stream->state == H2_STR_HALF_CLOSED_REM ) + { + antd_h2_frame_t* frame = (antd_h2_frame_t*) malloc(sizeof(antd_h2_frame_t)); + frame->header = *frame_h; + frame->pageload = data; + h2_stream_io_put(stream,frame); + if(is_new_stream) + { + // TODO create new request + // just dump the scheduler when we have a connection + antd_schedule_task( antd_create_task(antd_h2_stream_handle, antd_h2_request_init(rq, stream) , NULL, time(NULL))); + } + return H2_NO_ERROR; + } + else + { + free(data); + return H2_PROTOCOL_ERROR; + } +} + +antd_h2_frame_t* h2_streamio_get(struct queue_root* io) +{ + struct queue_head* head = queue_get(io); + if(!head) return NULL; + antd_h2_frame_t* frame = (antd_h2_frame_t*)head->data; + free(head); + return frame; +} + +void h2_stream_io_put(antd_h2_stream_t* stream, antd_h2_frame_t* frame) +{ + struct queue_head* head = (struct queue_head*) malloc(sizeof(struct queue_head)); + INIT_QUEUE_HEAD(head); + head->data = (void*)frame; + if(frame->header.identifier % 2 == 0) + { + queue_put(head, stream->stdin); + } + else + { + queue_put(head, stream->stdout); + } +} + + +void antd_h2_destroy_frame(antd_h2_frame_t* frame) +{ + if(frame->pageload) + free(frame->pageload); + free(frame); +} + static int process_frame(void* source, antd_h2_frame_header_t* frame_h) { int stat; @@ -173,6 +327,9 @@ static int process_frame(void* source, antd_h2_frame_header_t* frame_h) case H2_FRM_WINDOW_UPDATE: stat = process_window_update_frame(source,frame_h); break; + case H2_FRM_HEADER: + stat = process_header_frame(source, frame_h); + break; default: printf("Frame: %d, length: %d id: %d\n", frame_h->type, frame_h->length, frame_h->identifier); stat = H2_IGNORED; @@ -180,25 +337,7 @@ static int process_frame(void* source, antd_h2_frame_header_t* frame_h) if(stat == H2_NO_ERROR || stat == H2_IGNORED) return stat; - antd_h2_conn_t* conn = H2_CONN(source); - if(conn) - { - // send error frame - antd_h2_frame_header_t header; - header.identifier = frame_h->identifier; - //header.type = stat; - header.flags = 0; - header.length = 8; - uint8_t error_body[8]; - int tmp = htonl(conn->last_stream_id); - memcpy(error_body, &tmp , 4 ); - tmp = htonl(stat); - memcpy(error_body + 4, &tmp ,4); - header.type = H2_FRM_RST_STREAM; - if(frame_h->identifier == 0) - header.type = H2_FRM_GOAWAY; - antd_h2_send_frame(source,&header,error_body); - } + antd_h2_error(source, frame_h->identifier,stat); return stat; } @@ -213,15 +352,17 @@ void* antd_h2_preface_ck(void* data) { // TODO servers MUST treat an invalid connection preface as a // connection error (Section 5.4.1) of type PROTOCOL_ERROR - ERROR("Unable to read preface for client %d: [%s]",rq->client->sock,buf); + ERROR("Unable to read preface for client %d: [%s]",rq->client->id,buf); + antd_h2_error(data,0, H2_PROTOCOL_ERROR); return antd_empty_task((void *)rq,rq->client->last_io); } buf[24] = '\0'; if(strcmp(buf, H2_CONN_PREFACE) != 0) { - ERROR("Connection preface is not correct for client %d: [%s]",rq->client->sock,buf); + ERROR("Connection preface is not correct for client %d: [%s]",rq->client->id,buf); // TODO servers MUST treat an invalid connection preface as a // connection error (Section 5.4.1) of type PROTOCOL_ERROR + antd_h2_error(data,0, H2_PROTOCOL_ERROR); return antd_empty_task((void *)rq, rq->client->last_io); } // read the setting frame @@ -230,8 +371,8 @@ void* antd_h2_preface_ck(void* data) // TODO: frame error // // send go away with PROTOCOL_ERROR - printf("error reading setting frame\n"); - ERROR("Unable to read setting frame from client %d",rq->client->sock); + ERROR("Unable to read setting frame from client %d",rq->client->id); + antd_h2_error(data,0, H2_PROTOCOL_ERROR); return antd_empty_task((void *)rq, rq->client->last_io); } // create a connection @@ -248,18 +389,15 @@ void* antd_h2_preface_ck(void* data) header.identifier = 0; if(antd_h2_send_frame(rq, &header,NULL)) { - printf("frame sent\n"); return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); } else { - printf("cannot send frame\n"); return antd_empty_task(data, rq->client->last_io); } } else { - printf("Error process frame %d\n", stat); return antd_empty_task(data, rq->client->last_io); } } @@ -267,14 +405,20 @@ void* antd_h2_preface_ck(void* data) void* antd_h2_handle(void* data) { antd_request_t* rq = (antd_request_t*) data; - antd_task_t* task; + antd_task_t* task = NULL; if(rq->client->flags & CLIENT_FL_READABLE) { - antd_h2_read(data); + if(!antd_h2_read(data)) + { + return antd_empty_task(data, rq->client->last_io); + } } if(rq->client->flags & CLIENT_FL_WRITABLE) { - antd_h2_write(data); + if(!antd_h2_write(data)) + { + return antd_empty_task(data, rq->client->last_io); + } } task = antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); @@ -285,30 +429,34 @@ void* antd_h2_handle(void* data) -void* antd_h2_read(void* data) +int antd_h2_read(void* data) { antd_h2_frame_header_t frame_h; antd_request_t* rq = (antd_request_t*) data; if(!antd_h2_read_frame_header(rq->client, &frame_h)) { - // TODO: frame error // send goaway frame - ERROR("Unable to read frame from client %d",rq->client->sock); - return antd_empty_task(data, rq->client->last_io); + ERROR("Unable to read frame from client %d",rq->client->id); + antd_h2_error(data, 0, H2_PROTOCOL_ERROR); + return 0; } - process_frame(data, &frame_h); - return antd_empty_task(data, rq->client->last_io); + int stat = process_frame(data, &frame_h); + if(stat == H2_NO_ERROR || stat == H2_IGNORED) + return 1; + + return 0; } -void* antd_h2_write(void* data) +int antd_h2_write(void* data) { - antd_request_t* rq = (antd_request_t*) data; + UNUSED(data); + //antd_request_t* rq = (antd_request_t*) data; //printf("write task\n"); - return antd_empty_task(data, rq->client->last_io); + return 1; } antd_h2_conn_t* antd_h2_open_conn() { - antd_h2_conn_t* conn = (antd_h2_conn_t*) malloc(sizeof(conn)); + antd_h2_conn_t* conn = (antd_h2_conn_t*) malloc(sizeof(antd_h2_conn_t)); if(! conn) return NULL; @@ -321,7 +469,11 @@ antd_h2_conn_t* antd_h2_open_conn() conn->win_sz = conn->settings.init_win_sz; conn->last_stream_id = 0; conn->streams = (antd_h2_stream_list_t*) malloc(2*sizeof(antd_h2_stream_list_t)); - + if(conn->streams) + { + conn->streams[0] = NULL; + conn->streams[1] = NULL; + } return conn; } @@ -333,8 +485,8 @@ void antd_h2_close_conn(antd_h2_conn_t* conn) if(conn->streams) { - antd_h2_close_all_streams(conn->streams[0]); - antd_h2_close_all_streams(conn->streams[1]); + antd_h2_close_all_streams(&conn->streams[0]); + antd_h2_close_all_streams(&conn->streams[1]); free(conn->streams); } free(conn); @@ -398,19 +550,31 @@ antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t* streams, int id) void antd_h2_close_stream(antd_h2_stream_t* stream) { if(!stream) return; - if(stream->stdin) free(stream->stdin); - if(stream->stdout) free(stream->stdout); - free(stream); + // TODO empty the queue + if(stream->stdin) + { + queue_empty(stream->stdin, (void (*)(void*))antd_h2_destroy_frame); + free(stream->stdin); + } + if(stream->stdout) + { + queue_empty(stream->stdout, (void (*)(void*))antd_h2_destroy_frame); + free(stream->stdout); + } + //free(stream); } -void antd_h2_close_all_streams(antd_h2_stream_list_t streams) +void antd_h2_close_all_streams(antd_h2_stream_list_t* streams) { antd_h2_stream_list_t it; - for(it = streams; it != NULL; it = it->next) + while((*streams) != NULL) { + it = *streams; + (*streams) = (*streams)->next; if(it->stream) { antd_h2_close_stream(it->stream); + free(it->stream); } free(it); } @@ -436,6 +600,7 @@ void antd_h2_del_stream(antd_h2_stream_list_t* streams, int id) it = streams[idx]; streams[idx] = it->next; antd_h2_close_stream(it->stream); + free(it->stream); free(it); return; } @@ -448,6 +613,7 @@ void antd_h2_del_stream(antd_h2_stream_list_t* streams, int id) it->next = np->next; np->next = NULL; antd_h2_close_stream(np->stream); + free(np->stream); free(np); return; } diff --git a/lib/h2.h b/lib/h2.h index 0e5699e..fc66f1b 100644 --- a/lib/h2.h +++ b/lib/h2.h @@ -117,6 +117,17 @@ field value other than 0 MUST be treated as a connection error #define H2_SETTINGS_MAX_HEADER_LIST_SIZE 0x6 +// stream state +typedef enum { + H2_STR_IDLE, + H2_STR_OPEN, + H2_STR_REV_LOC, + H2_STR_REV_REM, + H2_STR_HALF_CLOSED_LOC, + H2_STR_HALF_CLOSED_REM, + H2_STR_CLOSED +} antd_h2_stream_state_t; + typedef struct{ uint32_t header_table_sz; uint32_t enable_push; @@ -150,6 +161,10 @@ typedef struct { struct queue_root* stdin; struct queue_root* stdout; int win_sz; + antd_h2_stream_state_t state; + //uint8_t flags; + int dependency; + uint8_t weight; int id; } antd_h2_stream_t; @@ -167,20 +182,34 @@ typedef struct { unsigned int identifier; } antd_h2_frame_header_t; +typedef struct { + antd_h2_frame_header_t header; + uint8_t* pageload; +} antd_h2_frame_t; + + +/*Frame utilities functions*/ +void antd_h2_destroy_frame(antd_h2_frame_t*); + /*stream utilities functions*/ +antd_h2_stream_t* antd_h2_init_stream(int id, int wsz); void antd_h2_close_stream(antd_h2_stream_t* stream); void antd_h2_add_stream(antd_h2_stream_list_t*, antd_h2_stream_t*); antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t*, int); void antd_h2_del_stream(antd_h2_stream_list_t*, int); -void antd_h2_close_all_streams(antd_h2_stream_list_t); +void antd_h2_close_all_streams(antd_h2_stream_list_t*); void antd_h2_update_streams_win_sz(antd_h2_stream_list_t streams, int offset); +void h2_stream_io_put(antd_h2_stream_t*, antd_h2_frame_t*); +antd_h2_frame_t* h2_streamio_get(struct queue_root*); +antd_request_t* antd_h2_request_init(antd_request_t*, antd_h2_stream_t*); + /*Connection utilities funtions*/ antd_h2_conn_t* antd_h2_open_conn(); void antd_h2_close_conn(antd_h2_conn_t*); -void* antd_h2_read(void* rq); -void* antd_h2_write(void* rq); +int antd_h2_read(void* rq); +int antd_h2_write(void* rq); void* antd_h2_preface_ck(void* rq); void* antd_h2_handle(void* rq); int antd_h2_send_frame(antd_request_t*, antd_h2_frame_header_t*, uint8_t*); diff --git a/lib/handle.c b/lib/handle.c index 4739038..96179ad 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -104,6 +104,11 @@ void plugindir(char* dest) UNUSED(dest); } +void antd_schedule_task(antd_task_t* task) +{ + UNUSED(task); +} + const char* get_status_str(int stat) { switch(stat) @@ -326,7 +331,7 @@ int antd_send(void *src, const void* data_in, int len_in) int count; #ifdef USE_OPENSSL - if(source->ssl) + if(source->stream) { //LOG("SSL WRITE\n"); //ret = SSL_write((SSL*) source->ssl, data, len); @@ -339,8 +344,8 @@ int antd_send(void *src, const void* data_in, int len_in) { // clear the error queue ERR_clear_error(); - count = SSL_write (source->ssl, ptr+written, writelen); - int err = SSL_get_error(source->ssl, count); + count = SSL_write (source->stream, ptr+written, writelen); + int err = SSL_get_error(source->stream, count); if (count > 0) { written += count; @@ -378,7 +383,7 @@ int antd_send(void *src, const void* data_in, int len_in) // no data available right now, wait a few seconds in case new data arrives... //printf("SSL_ERROR_WANT_READ\n"); - int sock = SSL_get_rfd(source->ssl); + int sock = SSL_get_rfd(source->stream); FD_ZERO(&fds); FD_SET(sock, &fds); @@ -398,7 +403,7 @@ int antd_send(void *src, const void* data_in, int len_in) { // socket not writable right now, wait a few seconds and try again... //printf("SSL_ERROR_WANT_WRITE \n"); - int sock = SSL_get_wfd(source->ssl); + int sock = SSL_get_wfd(source->stream); FD_ZERO(&fds); FD_SET(sock, &fds); @@ -437,7 +442,7 @@ int antd_send(void *src, const void* data_in, int len_in) written = 0; while (writelen > 0) { - count = send(source->sock, ptr+written, writelen, 0); + count = send(source->id, ptr+written, writelen, 0); if (count > 0) { written += count; @@ -468,7 +473,7 @@ int antd_recv(void *src, void* data, int len) int readlen=0; antd_client_t * source = (antd_client_t *) src; #ifdef USE_OPENSSL - if(source->ssl) + if(source->stream) { ptr = (char* )data; readlen = len > BUFFLEN?BUFFLEN:len; @@ -478,8 +483,8 @@ int antd_recv(void *src, void* data, int len) while (readlen > 0 )//&& source->attempt < MAX_ATTEMPT { ERR_clear_error(); - received = SSL_read (source->ssl, ptr+read, readlen); - int err = SSL_get_error(source->ssl, received); + received = SSL_read (source->stream, ptr+read, readlen); + int err = SSL_get_error(source->stream, received); if (received > 0) { read += received; @@ -518,7 +523,7 @@ int antd_recv(void *src, void* data, int len) // no data available right now, wait a few seconds in case new data arrives... //printf("SSL_ERROR_WANT_READ\n"); - int sock = SSL_get_rfd(source->ssl); + int sock = SSL_get_rfd(source->stream); FD_ZERO(&fds); FD_SET(sock, &fds); @@ -538,7 +543,7 @@ int antd_recv(void *src, void* data, int len) { // socket not writable right now, wait a few seconds and try again... //printf("SSL_ERROR_WANT_WRITE \n"); - int sock = SSL_get_wfd(source->ssl); + int sock = SSL_get_wfd(source->stream); FD_ZERO(&fds); FD_SET(sock, &fds); @@ -595,7 +600,7 @@ int antd_recv(void *src, void* data, int len) read = 0; while (readlen > 0 ) { - received = recv(((int) source->sock), ptr+read, readlen, 0); + received = recv(((int) source->id), ptr+read, readlen, 0); //LOG("Read : %c\n", *ptr); if (received > 0) { @@ -656,24 +661,24 @@ int antd_close(void* src) } #endif #ifdef USE_OPENSSL - if(source->ssl){ + if(source->stream){ //printf("SSL:Shutdown ssl\n"); //SSL_shutdown((SSL*) source->ssl); - SSL_set_shutdown((SSL*) source->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN); + SSL_set_shutdown((SSL*) source->stream, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN); //printf("SSL:Free ssl\n"); - SSL_free((SSL*) source->ssl); + SSL_free((SSL*) source->stream); //EVP_cleanup(); //ENGINE_cleanup(); CRYPTO_cleanup_all_ex_data(); ERR_remove_state(0); ERR_free_strings(); - source->ssl = NULL; + source->stream = NULL; //LOG("Freeing SSL\n"); } #endif //printf("Close sock %d\n", source->sock); - int ret = close(source->sock); + int ret = close(source->id); free(src); src = NULL; return ret; diff --git a/lib/handle.h b/lib/handle.h index 9e547ec..6fd35d7 100644 --- a/lib/handle.h +++ b/lib/handle.h @@ -44,6 +44,7 @@ typedef enum {ANTD_CGZ, ANTD_CDEFL, ANTD_CNONE} antd_compress_t; #define CLIENT_FL_HTTP_1_1 0x04 #define CLIENT_FL_READABLE 0x08 #define CLIENT_FL_WRITABLE 0x10 +#define CLIENT_FL_H2_STREAM 0x20 typedef struct { unsigned int port; @@ -54,8 +55,8 @@ typedef struct { } port_config_t; typedef struct{ - int sock; - void* ssl; + int id; + void* stream; uint8_t flags; time_t last_io; // compress option @@ -122,6 +123,7 @@ void __attribute__((weak)) dbdir(char* dest); void __attribute__((weak)) tmpdir(char* dest); void __attribute__((weak)) plugindir(char* dest); int __attribute__((weak)) compressable(char* ctype); +void __attribute__((weak)) antd_schedule_task(antd_task_t*); void set_nonblock(int socket); //void set_block(int socket); diff --git a/lib/queue.c b/lib/queue.c index 11a6865..20c4bd4 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -50,6 +50,24 @@ void queue_put(struct queue_head *new, } } +int queue_readable(struct queue_root *root) +{ + return root->out_queue != NULL && root->out_queue != QUEUE_POISON1; +} + +void queue_empty(struct queue_root * root, void (* fn)(void*)) +{ + struct queue_head* head = NULL; + while((head = queue_get(root)) != NULL) + { + if(fn) + { + fn(head->data); + } + free(head); + } +} + struct queue_head *queue_get(struct queue_root *root) { // pthread_spin_lock(&root->lock); diff --git a/lib/queue.h b/lib/queue.h index f920d7c..f398001 100644 --- a/lib/queue.h +++ b/lib/queue.h @@ -14,7 +14,8 @@ struct queue_root *ALLOC_QUEUE_ROOT(); void INIT_QUEUE_HEAD(struct queue_head *head); void queue_put(struct queue_head *,struct queue_root *); - +int queue_readable(struct queue_root *); +void queue_empty(struct queue_root *, void (*)(void*) ); struct queue_head *queue_get(struct queue_root *root); #endif // QUEUE_H diff --git a/lib/ws.c b/lib/ws.c index 3b2ba0f..df4c91c 100644 --- a/lib/ws.c +++ b/lib/ws.c @@ -402,7 +402,7 @@ int ws_client_connect(ws_client_t* wsclient, port_config_t pcnf) return -1; } // will be free - wsclient->antdsock->sock = sock; + wsclient->antdsock->id = sock; wsclient->antdsock->flags = 0; wsclient->antdsock->last_io = time(NULL); wsclient->antdsock->zstream = NULL; @@ -486,19 +486,19 @@ int ws_client_connect(ws_client_t* wsclient, port_config_t pcnf) SSL_CTX_set_verify(wsclient->ssl_ctx, SSL_VERIFY_NONE, NULL); } - wsclient->antdsock->ssl = (void*)SSL_new(wsclient->ssl_ctx); - if(!wsclient->antdsock->ssl) + wsclient->antdsock->stream = (void*)SSL_new(wsclient->ssl_ctx); + if(!wsclient->antdsock->stream) { ssl_err = ERR_get_error(); ERROR("SSL_new: %s", ERR_error_string(ssl_err, NULL)); return -1; } - SSL_set_fd((SSL*)wsclient->antdsock->ssl, wsclient->antdsock->sock); + SSL_set_fd((SSL*)wsclient->antdsock->stream, wsclient->antdsock->id); int stat, ret; ERR_clear_error(); - while( (ret = SSL_connect(wsclient->antdsock->ssl)) <= 0) + while( (ret = SSL_connect(wsclient->antdsock->stream)) <= 0) { - stat = SSL_get_error(wsclient->antdsock->ssl, ret); + stat = SSL_get_error(wsclient->antdsock->stream, ret); switch (stat) { case SSL_ERROR_WANT_READ: From ed6ece1bc6d0fa38fcb3923a4b2ebb20638cd881 Mon Sep 17 00:00:00 2001 From: lxsang Date: Thu, 30 Jan 2020 14:04:07 +0100 Subject: [PATCH 5/5] contd. --- http_server.c | 21 ++++++++--- lib/h2.c | 98 ++++++++++++++++++++++++++++++++++++++++----------- lib/h2.h | 18 +++++++--- lib/handle.c | 36 +++++++++++-------- 4 files changed, 129 insertions(+), 44 deletions(-) diff --git a/http_server.c b/http_server.c index b518961..9e1b6fd 100644 --- a/http_server.c +++ b/http_server.c @@ -1419,14 +1419,25 @@ void destroy_request(void *data) dput(rq->request, "REQUEST_HEADER", NULL); dput(rq->request, "REQUEST_DATA", NULL); dput(rq->request, "COOKIE", NULL); + #ifdef USE_OPENSSL #if OPENSSL_VERSION_NUMBER >= 0x10002000L - antd_h2_conn_t* conn = H2_CONN(data); - if(conn) + if(rq->client->flags & CLIENT_FL_H2_STREAM) { - //H2_CONNECTION - antd_h2_close_conn(conn); - dput(rq->request, "H2_CONNECTION", NULL); + // close the stream + antd_h2_stream_t* stream = (antd_h2_stream_t*)rq->client->stream; + antd_h2_close_stream(stream); + stream->state = H2_STR_FINALIZED; + } + else + { + antd_h2_conn_t* conn = H2_CONN(data); + if(conn) + { + //H2_CONNECTION + antd_h2_close_conn(conn); + dput(rq->request, "H2_CONNECTION", NULL); + } } #endif #endif diff --git a/lib/h2.c b/lib/h2.c index 23bccd8..c42b49d 100644 --- a/lib/h2.c +++ b/lib/h2.c @@ -192,7 +192,7 @@ antd_h2_stream_t* antd_h2_init_stream(int id, int wsz) stream->state = H2_STR_IDLE; stream->stdin = ALLOC_QUEUE_ROOT(); stream->stdout = ALLOC_QUEUE_ROOT(); - //stream->flags = 0; + stream->flags = 0; stream->dependency = 0; stream->weight = 255; return stream; @@ -226,8 +226,33 @@ static void* antd_h2_stream_handle(void* data) { antd_request_t* rq = (antd_request_t*) data; antd_h2_stream_t* stream = (antd_h2_stream_t*) rq->client->stream; - // transition state here - // TODO: next day + antd_task_t* task = NULL; + + if(stream->state == H2_STREAM_CLOSED) + { + return antd_empty_task(data, rq->client->last_io); + } + + // print header info + antd_h2_frame_t* frame = antd_h2_streamio_get(stream->stdin); + if(!frame) + { + task = antd_create_task(antd_h2_stream_handle,data,NULL,rq->client->last_io); + task->priority++; + return task; + } + + // print stream status + printf("End stream: %d\n", stream->flags & H2_END_STREAM_FLG); + printf("Priority %d\n", stream->flags & H2_PRIORITY_FLG ); + printf("Exclusive %d\n", stream->flags & H2_EXCLUSIVE_FLG ); + printf("dependencies %d\n", stream->dependency); + printf("weight %d\n", stream->weight); + + printf("frame end stream %d\n", frame->header.flags & H2_END_STREAM_FLG); + printf("frame end header %d\n", frame->header.flags & H2_END_HEADERS_FLG); + printf("frame padded %d\n", frame->header.flags & H2_PADDED_FLG); + printf("frame priority %d\n", frame->header.flags & H2_PRIORITY_FLG); return NULL; } @@ -263,28 +288,59 @@ static int process_header_frame(antd_request_t* rq, antd_h2_frame_header_t* fram antd_h2_add_stream(conn->streams,stream); is_new_stream = 1; } - if( stream->state == H2_STR_IDLE || stream->state == H2_STR_REV_LOC || stream->state == H2_STR_OPEN || stream->state == H2_STR_HALF_CLOSED_REM ) - { - antd_h2_frame_t* frame = (antd_h2_frame_t*) malloc(sizeof(antd_h2_frame_t)); - frame->header = *frame_h; - frame->pageload = data; - h2_stream_io_put(stream,frame); - if(is_new_stream) - { - // TODO create new request - // just dump the scheduler when we have a connection - antd_schedule_task( antd_create_task(antd_h2_stream_handle, antd_h2_request_init(rq, stream) , NULL, time(NULL))); - } - return H2_NO_ERROR; - } - else + + // switch state here + switch (stream->state) { + case H2_STR_IDLE: + stream->state = H2_STR_OPEN; + break; + + case H2_STR_REV_REM: + stream->state = H2_STR_HALF_CLOSED_LOC; + break; + + default: free(data); return H2_PROTOCOL_ERROR; } + + antd_h2_frame_t* frame = (antd_h2_frame_t*) malloc(sizeof(antd_h2_frame_t)); + frame->header = *frame_h; + frame->pageload = data; + + stream->flags = (frame_h->flags & H2_END_STREAM_FLG) | (frame_h->flags & H2_PRIORITY_FLG); + if(frame_h->flags & H2_PRIORITY_FLG) + { + uint8_t* ptr = data; + if(frame_h->flags & H2_PADDED_FLG) + ptr += 1; + // set stream weight + dependencies + memcpy(&stream->dependency, ptr, 4); + printf("%u\n", stream->dependency); + stream->dependency = ntohl(stream->dependency) & 0x7FFFFFFF; + printf("%d\n", *ptr); + if(*(ptr) & 0x80) + { + stream->flags |= H2_EXCLUSIVE_FLG; + } + else + { + stream->flags &= ~H2_EXCLUSIVE_FLG; + } + stream->weight = *(data + 4); + // set flag + } + + h2_stream_io_put(stream,frame); + if(is_new_stream) + { + antd_schedule_task( antd_create_task(antd_h2_stream_handle, antd_h2_request_init(rq, stream) , NULL, time(NULL))); + } + return H2_NO_ERROR; } -antd_h2_frame_t* h2_streamio_get(struct queue_root* io) +antd_h2_frame_t* antd_h2_streamio_get(struct queue_root* io) { struct queue_head* head = queue_get(io); if(!head) return NULL; @@ -300,11 +356,11 @@ void h2_stream_io_put(antd_h2_stream_t* stream, antd_h2_frame_t* frame) head->data = (void*)frame; if(frame->header.identifier % 2 == 0) { - queue_put(head, stream->stdin); + queue_put(head, stream->stdout); } else { - queue_put(head, stream->stdout); + queue_put(head, stream->stdin); } } diff --git a/lib/h2.h b/lib/h2.h index fc66f1b..4c6a868 100644 --- a/lib/h2.h +++ b/lib/h2.h @@ -117,6 +117,15 @@ field value other than 0 MUST be treated as a connection error #define H2_SETTINGS_MAX_HEADER_LIST_SIZE 0x6 +/*header flag*/ +#define H2_END_STREAM_FLG 0x1 +#define H2_END_HEADERS_FLG 0x4 +#define H2_PADDED_FLG 0x8 +#define H2_PRIORITY_FLG 0x20 +#define H2_EXCLUSIVE_FLG 0x40 + + + // stream state typedef enum { H2_STR_IDLE, @@ -125,7 +134,8 @@ typedef enum { H2_STR_REV_REM, H2_STR_HALF_CLOSED_LOC, H2_STR_HALF_CLOSED_REM, - H2_STR_CLOSED + H2_STR_CLOSED, + H2_STR_FINALIZED } antd_h2_stream_state_t; typedef struct{ @@ -162,8 +172,8 @@ typedef struct { struct queue_root* stdout; int win_sz; antd_h2_stream_state_t state; - //uint8_t flags; - int dependency; + uint8_t flags; + uint32_t dependency; uint8_t weight; int id; } antd_h2_stream_t; @@ -200,7 +210,7 @@ void antd_h2_del_stream(antd_h2_stream_list_t*, int); void antd_h2_close_all_streams(antd_h2_stream_list_t*); void antd_h2_update_streams_win_sz(antd_h2_stream_list_t streams, int offset); void h2_stream_io_put(antd_h2_stream_t*, antd_h2_frame_t*); -antd_h2_frame_t* h2_streamio_get(struct queue_root*); +antd_h2_frame_t* antd_h2_streamio_get(struct queue_root*); antd_request_t* antd_h2_request_init(antd_request_t*, antd_h2_stream_t*); diff --git a/lib/handle.c b/lib/handle.c index 96179ad..bc8b22a 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -1,4 +1,5 @@ #include "handle.h" + #define HTML_TPL "%s

%s

" static const char* S_100 = "Continue"; @@ -661,21 +662,28 @@ int antd_close(void* src) } #endif #ifdef USE_OPENSSL - if(source->stream){ - //printf("SSL:Shutdown ssl\n"); - //SSL_shutdown((SSL*) source->ssl); - SSL_set_shutdown((SSL*) source->stream, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN); - //printf("SSL:Free ssl\n"); - SSL_free((SSL*) source->stream); - - //EVP_cleanup(); - //ENGINE_cleanup(); - CRYPTO_cleanup_all_ex_data(); - ERR_remove_state(0); - ERR_free_strings(); - source->stream = NULL; - //LOG("Freeing SSL\n"); +#if OPENSSL_VERSION_NUMBER >= 0x10002000L + if(!(source->flags & CLIENT_FL_H2_STREAM)) + { +#endif + if(source->stream){ + //printf("SSL:Shutdown ssl\n"); + //SSL_shutdown((SSL*) source->ssl); + SSL_set_shutdown((SSL*) source->stream, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN); + //printf("SSL:Free ssl\n"); + SSL_free((SSL*) source->stream); + + //EVP_cleanup(); + //ENGINE_cleanup(); + CRYPTO_cleanup_all_ex_data(); + ERR_remove_state(0); + ERR_free_strings(); + source->stream = NULL; + //LOG("Freeing SSL\n"); + } +#if OPENSSL_VERSION_NUMBER >= 0x10002000L } +#endif #endif //printf("Close sock %d\n", source->sock); int ret = close(source->id);