diff --git a/Makefile.am b/Makefile.am index 2dbe5e5..94e92f7 100644 --- a/Makefile.am +++ b/Makefile.am @@ -21,6 +21,7 @@ libantd_la_SOURCES = lib/ini.c \ lib/ws.c \ lib/sha1.c \ lib/list.c \ + lib/queue.c \ lib/scheduler.c \ lib/h2.c @@ -34,7 +35,8 @@ pkginclude_HEADERS = lib/ini.h \ lib/list.h \ lib/scheduler.h \ lib/plugin.h \ - lib/h2.h + lib/h2.h \ + lib/queue.h EXTRA_DIST = plugin_manager.h http_server.h README.md LICENSE antd-config.ini diff --git a/http_server.c b/http_server.c index 038cb5f..16ff1ae 100644 --- a/http_server.c +++ b/http_server.c @@ -319,17 +319,15 @@ void *accept_request(void *data) return task; } } -#endif - //printf("Flag: %d\n", client->flags); - // now return the task base on the http version - if(client->flags & CLIENT_FL_HTTP_1_1) - { - task->handle = decode_request_header; - } - else +#if OPENSSL_VERSION_NUMBER >= 0x10002000L + if(!(client->flags & CLIENT_FL_HTTP_1_1)) { task->handle = antd_h2_preface_ck; + return task; } +#endif +#endif + task->handle = decode_request_header; return task; } @@ -1325,4 +1323,42 @@ int compressable(char* ctype) } return 0; } -#endif \ No newline at end of file +#endif + +void destroy_request(void *data) +{ + if (!data) + return; + antd_request_t *rq = (antd_request_t *)data; + //LOG("Close request %d", rq->client->sock); + // free all other thing + if (rq->request) + { + dictionary_t tmp = dvalue(rq->request, "COOKIE"); + if (tmp) + freedict(tmp); + tmp = dvalue(rq->request, "REQUEST_HEADER"); + if (tmp) + freedict(tmp); + tmp = dvalue(rq->request, "REQUEST_DATA"); + if (tmp) + freedict(tmp); + dput(rq->request, "REQUEST_HEADER", NULL); + dput(rq->request, "REQUEST_DATA", NULL); + dput(rq->request, "COOKIE", NULL); +#ifdef USE_OPENSSL +#if OPENSSL_VERSION_NUMBER >= 0x10002000L + antd_h2_conn_t* conn = H2_CONN(data); + if(conn) + { + //H2_CONNECTION + antd_h2_close_conn(conn); + dput(rq->request, "H2_CONNECTION", NULL); + } +#endif +#endif + freedict(rq->request); + } + antd_close(rq->client); + free(rq); +} \ No newline at end of file diff --git a/http_server.h b/http_server.h index 183dfa9..699cad5 100644 --- a/http_server.h +++ b/http_server.h @@ -40,5 +40,6 @@ void* decode_multi_part_request_data(void* data); void decode_cookie(const char*, dictionary_t d); char* post_data_decode(void*,int); void* execute_plugin(void* data, const char *path); +void destroy_request(void *data); #endif \ No newline at end of file diff --git a/httpd.c b/httpd.c index 795041f..6278669 100644 --- a/httpd.c +++ b/httpd.c @@ -289,7 +289,7 @@ int main(int argc, char* argv[]) pthread_t scheduler_th; if (pthread_create(&scheduler_th, NULL,(void *(*)(void *))antd_scheduler_wait, (void*)&scheduler) != 0) { - ERROR("pthread_create: cannot create worker"); + ERROR("pthread_create: cannot create scheduler thread"); stop_serve(0); exit(1); } diff --git a/lib/dictionary.c b/lib/dictionary.c index 3a389a6..60591b5 100644 --- a/lib/dictionary.c +++ b/lib/dictionary.c @@ -118,8 +118,6 @@ chain_t insert(dictionary_t dic,const char* key, void* value, antd_dict_item_typ return np; } - - chain_t dremove(dictionary_t dic, const char* key) { if(dic->map == NULL) return 0; diff --git a/lib/h2.c b/lib/h2.c index ed1d49d..d73268d 100644 --- a/lib/h2.c +++ b/lib/h2.c @@ -1,6 +1,13 @@ + +#include #include "h2.h" #include "scheduler.h" +struct antd_h2_stream_list_t{ + struct antd_h2_stream_list_t* next; + antd_h2_stream_t* stream; +}; + static int antd_h2_read_frame_header(antd_client_t* cl, antd_h2_frame_header_t* frame) { frame->length = 0; @@ -11,43 +18,191 @@ static int antd_h2_read_frame_header(antd_client_t* cl, antd_h2_frame_header_t* if( antd_recv(cl,& header,sizeof(header)) != sizeof(header)) return 0; // network byte order is big endian // read frame length - frame->length = (*header << 16) + (*(header + 1)<< 8) + *(header+2); + memcpy( ((uint8_t*)(&frame->length)) + 1,header,3); + frame->length = ntohl(frame->length); // frame type frame->type = *(header + 3); // frame flags frame->flags = *(header + 4); - // frame identifier - frame->identifier = ((*(header + 5) & 0x7F) << 24) + (*(header + 6)<< 16) + (*(header + 7)<< 8) + *(header + 8); - + memcpy(&frame->identifier,header+5,4); + frame->identifier = ntohl(frame->identifier) & 0x7FFFFFFF; return 1; } - -static int process_frame(void* source, antd_h2_frame_header_t* frame_h) +static int process_setting_frame(antd_request_t* rq, antd_h2_frame_header_t* frame_h) { - // verify frame - printf("Frame type: %d\n", frame_h->type & 0xff); - printf("Frame flag: %d\n",frame_h->flags); - printf("frame identifier: %d\n", frame_h->identifier); + if(frame_h->length == 0) + return H2_NO_ERROR; + if(frame_h->flags & H2_SETTING_ACK_FLG) + { + return H2_FRAME_SIZE_ERROR; + } + if(frame_h->identifier != 0) + { + return H2_PROTOCOL_ERROR; + } + if(frame_h->length % 6 != 0) + { + return H2_FRAME_SIZE_ERROR; + } + uint8_t* frame_data = (uint8_t*)malloc(frame_h->length); if(!frame_data) { - return 0; + return H2_INTERNAL_ERROR; } - antd_request_t* rq = (antd_request_t*) source; - if(antd_recv(rq->client,frame_data,frame_h->length) != frame_h->length) + antd_h2_conn_t* conn = (antd_h2_conn_t*) dvalue(rq->request,"H2_CONNECTION"); + if(!conn) + { + return H2_INTERNAL_ERROR; + } + if(antd_recv(rq->client,frame_data,frame_h->length) != (int)frame_h->length) { - // TODO error - // go away ERROR("Cannot read all frame data"); free(frame_data); - return H2_NO_ERROR; + return H2_PROTOCOL_ERROR; } + // read each identifier + uint16_t param_id; + int param_val; + uint8_t* ptr; + for (size_t i = 0; i < frame_h->length / 6 ; i++) + { + ptr = frame_data + i*6; + memcpy(¶m_id,ptr,2); + param_id = ntohs(param_id); + ptr += 2; + memcpy(¶m_val,ptr,4); + param_val = ntohl(param_val); + printf("id: %d val: %d\n", param_id, param_val); + switch (param_id) + { + case H2_SETTINGS_HEADER_TABLE_SIZE: + conn->settings.header_table_sz = param_val; + break; + + case H2_SETTINGS_ENABLE_PUSH: + if(param_val != 0 || param_val != 1) + { + free(frame_data); + return H2_PROTOCOL_ERROR; + } + conn->settings.enable_push = param_val; + break; + + case H2_SETTINGS_MAX_CONCURRENT_STREAMS: + conn->settings.max_concurrent_streams = param_val; + break; + case H2_SETTINGS_INITIAL_WINDOW_SIZE: + if(param_val > 2147483647)// 2^31-1 + { + free(frame_data); + return H2_FLOW_CONTROL_ERROR; + } + int offset = param_val - conn->settings.init_win_sz; + conn->settings.init_win_sz = param_val; + // this should applied only to streams with active flow-control windows + antd_h2_update_streams_win_sz(conn->streams[0], offset); + /* + In addition to changing the flow-control window for streams that are + not yet active, a SETTINGS frame can alter the initial flow-control + window size for streams with active flow-control windows (that is, + streams in the "open" or "half-closed (remote)" state). When the + value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust + the size of all stream flow-control windows that it maintains by the + difference between the new value and the old value. + */ + break; + case H2_SETTINGS_MAX_FRAME_SIZE: + if(param_val < 16384 || param_val > 16777215) // < 2^14 or > 2^24-1 + { + free(frame_data); + return H2_PROTOCOL_ERROR; + } + conn->settings.max_frame_sz = param_val; + break; + case H2_SETTINGS_MAX_HEADER_LIST_SIZE: + conn->settings.max_header_list_sz = param_val; + break; + default: + free(frame_data); + return H2_IGNORED; + } + } free(frame_data); + // send back ack setting frame return H2_NO_ERROR; } +static int process_window_update_frame(antd_request_t* rq, antd_h2_frame_header_t* frame_h) +{ + int window_size_incr; + if(antd_recv(rq->client,&window_size_incr, 4) != 4) + { + return H2_PROTOCOL_ERROR; + } + window_size_incr = ntohl(window_size_incr) & 0x7FFFFFFF; + if(window_size_incr <1 || window_size_incr > 2147483647) + return H2_PROTOCOL_ERROR; + antd_h2_conn_t* conn = H2_CONN(rq); + if(!conn) + return H2_INTERNAL_ERROR; + if(frame_h->identifier == 0) + { + conn->win_sz += window_size_incr; + } + else + { + antd_h2_stream_t* stream = antd_h2_get_stream(conn->streams,frame_h->identifier); + if(!stream) + return H2_INTERNAL_ERROR; + stream->win_sz += window_size_incr; + } + return H2_NO_ERROR; +} + +static int process_frame(void* source, antd_h2_frame_header_t* frame_h) +{ + int stat; + switch (frame_h->type) + { + case H2_FRM_SETTINGS: + stat = process_setting_frame(source, frame_h); + break; + case H2_FRM_WINDOW_UPDATE: + stat = process_window_update_frame(source,frame_h); + break; + default: + printf("Frame: %d, length: %d id: %d\n", frame_h->type, frame_h->length, frame_h->identifier); + stat = H2_IGNORED; + } + if(stat == H2_NO_ERROR || stat == H2_IGNORED) + return stat; + + antd_h2_conn_t* conn = H2_CONN(source); + if(conn) + { + // send error frame + antd_h2_frame_header_t header; + header.identifier = frame_h->identifier; + //header.type = stat; + header.flags = 0; + header.length = 8; + uint8_t error_body[8]; + int tmp = htonl(conn->last_stream_id); + memcpy(error_body, &tmp , 4 ); + tmp = htonl(stat); + memcpy(error_body + 4, &tmp ,4); + header.type = H2_FRM_RST_STREAM; + if(frame_h->identifier == 0) + header.type = H2_FRM_GOAWAY; + antd_h2_send_frame(source,&header,error_body); + } + return stat; + +} + void* antd_h2_preface_ck(void* data) { char buf[25]; @@ -79,8 +234,34 @@ void* antd_h2_preface_ck(void* data) ERROR("Unable to read setting frame from client %d",rq->client->sock); return antd_empty_task((void *)rq, rq->client->last_io); } - process_frame(rq, &frame_h); - return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); + // create a connection + dput(rq->request,"H2_CONNECTION",antd_h2_open_conn()); + int stat = process_frame(rq, &frame_h); + if( stat == H2_NO_ERROR || stat == H2_IGNORED) + { + //TODO: send back setting frame + // and init the conn + antd_h2_frame_header_t header; + header.length = 0; + header.type = H2_FRM_SETTINGS; + header.flags = 0x1; + header.identifier = 0; + if(antd_h2_send_frame(rq, &header,NULL)) + { + printf("frame sent\n"); + return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); + } + else + { + printf("cannot send frame\n"); + return antd_empty_task(data, rq->client->last_io); + } + } + else + { + printf("Error process frame %d\n", stat); + return antd_empty_task(data, rq->client->last_io); + } } void* antd_h2_handle(void* data) @@ -96,7 +277,7 @@ void* antd_h2_handle(void* data) antd_h2_write(data); } - task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); + task = antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); task->priority++; return task; } @@ -121,6 +302,154 @@ void* antd_h2_read(void* data) void* antd_h2_write(void* data) { antd_request_t* rq = (antd_request_t*) data; - printf("write task\n"); + //printf("write task\n"); return antd_empty_task(data, rq->client->last_io); +} + +antd_h2_conn_t* antd_h2_open_conn() +{ + antd_h2_conn_t* conn = (antd_h2_conn_t*) malloc(sizeof(conn)); + if(! conn) + return NULL; + + conn->settings.header_table_sz = 4096; + conn->settings.enable_push = 1; + conn->settings.max_concurrent_streams = 100; + conn->settings.init_win_sz = 65535; + conn->settings.max_frame_sz = 16384; + conn->settings.max_header_list_sz = 0; //unlimited + conn->win_sz = conn->settings.init_win_sz; + conn->last_stream_id = 0; + conn->streams = (antd_h2_stream_list_t*) malloc(2*sizeof(antd_h2_stream_list_t)); + + return conn; +} + + +void antd_h2_close_conn(antd_h2_conn_t* conn) +{ + if(! conn) + return; + + if(conn->streams) + { + antd_h2_close_all_streams(conn->streams[0]); + antd_h2_close_all_streams(conn->streams[1]); + free(conn->streams); + } + free(conn); +} + +int antd_h2_send_frame(antd_request_t* rq, antd_h2_frame_header_t* fr_header, uint8_t* data) +{ + // send the frame header in network bytes order + uint8_t header[9]; + int nbo_int = htonl(fr_header->length) >> 8; + memcpy(header,&nbo_int,3); + // type + *(header+3) = fr_header->type; + // flag + *(header+4) = fr_header->flags; + // identifier + nbo_int = htonl(fr_header->identifier); + memcpy(header+5, &nbo_int,4); + if(antd_send(rq->client,header,sizeof(header)) != sizeof(header)) + { + return 0; + } + if(fr_header->length == 0) + { + return 1; + } + if(data == NULL) + { + return 0; + } + // send data + if(antd_send(rq->client,data,fr_header->length) !=(int)fr_header->length) + { + return 0; + } + return 1; +} + +/*stream utilities functions*/ +void antd_h2_add_stream(antd_h2_stream_list_t* streams, antd_h2_stream_t* stream) +{ + if(!stream || !streams) return; + int idx = stream->id % 2; + antd_h2_stream_list_t it = (antd_h2_stream_list_t) malloc(sizeof(it)); + it->next = streams[idx]; + it->stream = stream; + streams[idx] = it; +} +antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t* streams, int id) +{ + int idx = id % 2; + antd_h2_stream_list_t it; + + for(it = streams[idx]; it != NULL; it = it->next) + { + if(id == it->stream->id) + return it->stream; + } + return NULL; +} +void antd_h2_close_stream(antd_h2_stream_t* stream) +{ + if(!stream) return; + if(stream->stdin) free(stream->stdin); + if(stream->stdout) free(stream->stdout); + free(stream); +} + +void antd_h2_close_all_streams(antd_h2_stream_list_t streams) +{ + antd_h2_stream_list_t it; + for(it = streams; it != NULL; it = it->next) + { + if(it->stream) + { + antd_h2_close_stream(it->stream); + } + free(it); + } +} +void antd_h2_update_streams_win_sz(antd_h2_stream_list_t streams, int offset) +{ + antd_h2_stream_list_t it; + for(it = streams; it != NULL; it = it->next) + { + if(it->stream) + { + it->stream->win_sz += offset; + } + } +} + +void antd_h2_del_stream(antd_h2_stream_list_t* streams, int id) +{ + int idx = id % 2; + antd_h2_stream_list_t it; + if(streams[idx] && streams[idx]->stream->id == id) + { + it = streams[idx]; + streams[idx] = it->next; + antd_h2_close_stream(it->stream); + free(it); + return; + } + for(it = streams[idx]; it != NULL; it = it->next) + { + if(it->next!= NULL && id == it->next->stream->id) + { + antd_h2_stream_list_t np; + np = it->next; + it->next = np->next; + np->next = NULL; + antd_h2_close_stream(np->stream); + free(np); + return; + } + } } \ No newline at end of file diff --git a/lib/h2.h b/lib/h2.h index 0a9edf7..0e5699e 100644 --- a/lib/h2.h +++ b/lib/h2.h @@ -1,7 +1,9 @@ #ifndef HTTP2_H #define HTTP2_H + #include "handle.h" #include "hpack.h" +#include "queue.h" #define H2_CONN_PREFACE "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" @@ -90,21 +92,65 @@ instead of HTTP/2. */ #define H2_HTTP_1_1_REQUIRED 0xd +/* +The frame should be ignore +*/ +#define H2_IGNORED 0xe + +/* +SETTING FRAME CONSTs +*/ +/* +When set, bit 0 indicates that this frame acknowledges +receipt and application of the peer's SETTINGS frame. When this +bit is set, the payload of the SETTINGS frame MUST be empty. +Receipt of a SETTINGS frame with the ACK flag set and a length +field value other than 0 MUST be treated as a connection error +(Section 5.4.1) of type FRAME_SIZE_ERROR +*/ +#define H2_SETTING_ACK_FLG 0x1 +#define H2_SETTINGS_HEADER_TABLE_SIZE 0x1 +#define H2_SETTINGS_ENABLE_PUSH 0x2 +#define H2_SETTINGS_MAX_CONCURRENT_STREAMS 0x3 +#define H2_SETTINGS_INITIAL_WINDOW_SIZE 0x4 +#define H2_SETTINGS_MAX_FRAME_SIZE 0x5 +#define H2_SETTINGS_MAX_HEADER_LIST_SIZE 0x6 + + +typedef struct{ + uint32_t header_table_sz; + uint32_t enable_push; + uint32_t max_concurrent_streams; + uint32_t init_win_sz; + uint32_t max_frame_sz; + uint32_t max_header_list_sz; +} antd_h2_conn_setting_t; + + +typedef struct antd_h2_stream_list_t* antd_h2_stream_list_t; /** * Struct that holds a * h2 connection */ typedef struct { - + antd_h2_conn_setting_t settings; + antd_h2_stream_list_t* streams; + int win_sz; + int last_stream_id; } antd_h2_conn_t; +#define H2_CONN(rq) ((antd_h2_conn_t*)dvalue(((antd_request_t*)rq)->request,"H2_CONNECTION")) +#define H2_SETTING(rq) (H2_CON(rq)->settings) /** * Struct that holds a * h2 stream */ typedef struct { - + struct queue_root* stdin; + struct queue_root* stdout; + int win_sz; + int id; } antd_h2_stream_t; /** @@ -121,13 +167,21 @@ typedef struct { unsigned int identifier; } antd_h2_frame_header_t; +/*stream utilities functions*/ +void antd_h2_close_stream(antd_h2_stream_t* stream); +void antd_h2_add_stream(antd_h2_stream_list_t*, antd_h2_stream_t*); +antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t*, int); +void antd_h2_del_stream(antd_h2_stream_list_t*, int); +void antd_h2_close_all_streams(antd_h2_stream_list_t); +void antd_h2_update_streams_win_sz(antd_h2_stream_list_t streams, int offset); +/*Connection utilities funtions*/ +antd_h2_conn_t* antd_h2_open_conn(); +void antd_h2_close_conn(antd_h2_conn_t*); void* antd_h2_read(void* rq); void* antd_h2_write(void* rq); - void* antd_h2_preface_ck(void* rq); - void* antd_h2_handle(void* rq); - +int antd_h2_send_frame(antd_request_t*, antd_h2_frame_header_t*, uint8_t*); #endif \ No newline at end of file diff --git a/lib/handle.c b/lib/handle.c index 6ceb10e..4739038 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -1,5 +1,4 @@ #include "handle.h" - #define HTML_TPL "%s

%s

" static const char* S_100 = "Continue"; @@ -849,36 +848,4 @@ int read_buf(void* sock, char*buf,int size) } buf[i] = '\0'; return i; -} -/* - We put it here since we want the plugin is able - to destroy the request if it want to - in this case, the plugin should return an empty - with no data -*/ -void destroy_request(void *data) -{ - if (!data) - return; - antd_request_t *rq = (antd_request_t *)data; - //LOG("Close request %d", rq->client->sock); - // free all other thing - if (rq->request) - { - dictionary_t tmp = dvalue(rq->request, "COOKIE"); - if (tmp) - freedict(tmp); - tmp = dvalue(rq->request, "REQUEST_HEADER"); - if (tmp) - freedict(tmp); - tmp = dvalue(rq->request, "REQUEST_DATA"); - if (tmp) - freedict(tmp); - dput(rq->request, "REQUEST_HEADER", NULL); - dput(rq->request, "REQUEST_DATA", NULL); - dput(rq->request, "COOKIE", NULL); - freedict(rq->request); - } - antd_close(rq->client); - free(rq); } \ No newline at end of file diff --git a/lib/handle.h b/lib/handle.h index bd7a995..8054d40 100644 --- a/lib/handle.h +++ b/lib/handle.h @@ -139,5 +139,4 @@ int read_buf(void* sock,char* buf,int i); int antd_send( void *source, const void* data, int len); int antd_recv( void *source, void* data, int len); int antd_close(void* source); -void destroy_request(void *data); #endif diff --git a/lib/queue.c b/lib/queue.c new file mode 100644 index 0000000..11a6865 --- /dev/null +++ b/lib/queue.c @@ -0,0 +1,84 @@ +// code base on: https://github.com/majek/dump/blob/master/msqueue/queue_semiblocking.c +#include +#include +#include + +#include "queue.h" + +#define QUEUE_POISON1 ((void*)0xCAFEBAB5) + +struct queue_root { + struct queue_head *in_queue; + struct queue_head *out_queue; +// pthread_spinlock_t lock; + pthread_mutex_t lock; +}; + + +#ifndef _cas +# define _cas(ptr, oldval, newval) \ + __sync_bool_compare_and_swap(ptr, oldval, newval) +#endif + +struct queue_root *ALLOC_QUEUE_ROOT() +{ + struct queue_root *root = \ + malloc(sizeof(struct queue_root)); + +// pthread_spin_init(&root->lock, PTHREAD_PROCESS_PRIVATE); + pthread_mutex_init(&root->lock, NULL); + + root->in_queue = NULL; + root->out_queue = NULL; + return root; +} + +void INIT_QUEUE_HEAD(struct queue_head *head) +{ + head->next = QUEUE_POISON1; +} + +void queue_put(struct queue_head *new, + struct queue_root *root) +{ + while (1) { + struct queue_head *in_queue = root->in_queue; + new->next = in_queue; + if (_cas(&root->in_queue, in_queue, new)) { + break; + } + } +} + +struct queue_head *queue_get(struct queue_root *root) +{ +// pthread_spin_lock(&root->lock); + pthread_mutex_lock(&root->lock); + + if (!root->out_queue) { + while (1) { + struct queue_head *head = root->in_queue; + if (!head) { + break; + } + if (_cas(&root->in_queue, head, NULL)) { + // Reverse the order + while (head) { + struct queue_head *next = head->next; + head->next = root->out_queue; + root->out_queue = head; + head = next; + } + break; + } + } + } + + struct queue_head *head = root->out_queue; + if (head) { + root->out_queue = head->next; + } +// pthread_spin_unlock(&root->lock); + pthread_mutex_unlock(&root->lock); + return head; +} \ No newline at end of file diff --git a/lib/queue.h b/lib/queue.h new file mode 100644 index 0000000..f920d7c --- /dev/null +++ b/lib/queue.h @@ -0,0 +1,20 @@ +// code based on : https://github.com/majek/dump/blob/master/msqueue/queue.h + +#ifndef QUEUE_H +#define QUEUE_H + +struct queue_root; + +struct queue_head { + void* data; + struct queue_head *next; +}; + +struct queue_root *ALLOC_QUEUE_ROOT(); +void INIT_QUEUE_HEAD(struct queue_head *head); + +void queue_put(struct queue_head *,struct queue_root *); + +struct queue_head *queue_get(struct queue_root *root); + +#endif // QUEUE_H