From dad3b25fc8fb87692d38ef02c60d0e0003337ed0 Mon Sep 17 00:00:00 2001 From: lxsang Date: Wed, 29 Jan 2020 17:09:16 +0100 Subject: [PATCH] cont --- http_server.c | 16 +-- httpd.c | 32 +++--- lib/h2.c | 262 +++++++++++++++++++++++++++++++++++++++++--------- lib/h2.h | 35 ++++++- lib/handle.c | 39 ++++---- lib/handle.h | 6 +- lib/queue.c | 18 ++++ lib/queue.h | 3 +- lib/ws.c | 12 +-- 9 files changed, 325 insertions(+), 98 deletions(-) diff --git a/http_server.c b/http_server.c index 16ff1ae..09cd337 100644 --- a/http_server.c +++ b/http_server.c @@ -263,20 +263,20 @@ void *accept_request(void *data) // first verify if the socket is ready antd_client_t *client = (antd_client_t *)rq->client; FD_ZERO(&read_flags); - FD_SET(rq->client->sock, &read_flags); + FD_SET(rq->client->id, &read_flags); FD_ZERO(&write_flags); - FD_SET(rq->client->sock, &write_flags); + FD_SET(rq->client->id, &write_flags); struct timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = 500; // select - int sel = select(client->sock + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); + int sel = select(client->id + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); if (sel == -1) { antd_error(rq->client, 400, "Bad request"); return task; } - if (sel == 0 || (!FD_ISSET(client->sock, &read_flags) && !FD_ISSET(client->sock, &write_flags))) + if (sel == 0 || (!FD_ISSET(client->id, &read_flags) && !FD_ISSET(client->id, &write_flags))) { task->handle = accept_request; return task; @@ -284,12 +284,12 @@ void *accept_request(void *data) // perform the ssl handshake if enabled #ifdef USE_OPENSSL int ret = -1, stat; - if (client->ssl && !(client->flags & CLIENT_FL_ACCEPTED) ) + if (client->stream && !(client->flags & CLIENT_FL_ACCEPTED) ) { //LOG("Atttempt %d\n", client->attempt); - if (SSL_accept((SSL *)client->ssl) == -1) + if (SSL_accept((SSL *)client->stream) == -1) { - stat = SSL_get_error((SSL *)client->ssl, ret); + stat = SSL_get_error((SSL *)client->stream, ret); switch (stat) { case SSL_ERROR_WANT_READ: @@ -312,7 +312,7 @@ void *accept_request(void *data) } else { - if (!FD_ISSET(client->sock, &read_flags)) + if (!FD_ISSET(client->id, &read_flags)) { client->flags |= CLIENT_FL_ACCEPTED; task->handle = accept_request; diff --git a/httpd.c b/httpd.c index 6278669..3b04d4a 100644 --- a/httpd.c +++ b/httpd.c @@ -6,6 +6,12 @@ static antd_scheduler_t scheduler; + +void antd_schedule_task(antd_task_t* task) +{ + antd_add_task(&scheduler,task); +} + #ifdef USE_OPENSSL // define the cipher suit used @@ -186,15 +192,15 @@ static int is_task_ready(antd_task_t* task) fd_set read_flags, write_flags; struct timeval timeout; FD_ZERO(&read_flags); - FD_SET(rq->client->sock, &read_flags); + FD_SET(rq->client->id, &read_flags); FD_ZERO(&write_flags); - FD_SET(rq->client->sock, &write_flags); + FD_SET(rq->client->id, &write_flags); timeout.tv_sec = 0; timeout.tv_usec = 0; - int sel = select(rq->client->sock + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); - if(sel > 0 && (FD_ISSET(rq->client->sock, &read_flags)|| FD_ISSET(rq->client->sock, &write_flags))) + int sel = select(rq->client->id + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); + if(sel > 0 && (FD_ISSET(rq->client->id, &read_flags)|| FD_ISSET(rq->client->id, &write_flags))) { - if(FD_ISSET(rq->client->sock, &read_flags)) + if(FD_ISSET(rq->client->id, &read_flags)) { rq->client->flags |= CLIENT_FL_READABLE; } @@ -203,7 +209,7 @@ static int is_task_ready(antd_task_t* task) rq->client->flags &= ~CLIENT_FL_READABLE; } - if(FD_ISSET(rq->client->sock, &write_flags)) + if(FD_ISSET(rq->client->id, &write_flags)) { rq->client->flags |= CLIENT_FL_WRITABLE; } @@ -368,22 +374,22 @@ int main(int argc, char* argv[]) if (setsockopt (client_sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout,sizeof(timeout)) < 0) perror("setsockopt failed\n"); */ - client->sock = client_sock; + client->id = client_sock; time(&client->last_io); - client->ssl = NULL; + client->stream = NULL; // default selected protocol is http/1.1 client->flags = CLIENT_FL_HTTP_1_1; #ifdef USE_OPENSSL if(pcnf->usessl == 1) { - client->ssl = (void*)SSL_new(ctx); - if(!client->ssl) continue; - SSL_set_fd((SSL*)client->ssl, client->sock); + client->stream = (void*)SSL_new(ctx); + if(!client->stream) continue; + SSL_set_fd((SSL*)client->stream, client->id); // this can be used in the protocol select callback to // set the protocol selected by the server - if(!SSL_set_ex_data((SSL*)client->ssl, client->sock, client)) + if(!SSL_set_ex_data((SSL*)client->stream, client->id, client)) { - ERROR("Cannot set ex data to ssl client:%d", client->sock); + ERROR("Cannot set ex data to ssl client:%d", client->id); } } #endif diff --git a/lib/h2.c b/lib/h2.c index d73268d..23bccd8 100644 --- a/lib/h2.c +++ b/lib/h2.c @@ -75,7 +75,7 @@ static int process_setting_frame(antd_request_t* rq, antd_h2_frame_header_t* fra ptr += 2; memcpy(¶m_val,ptr,4); param_val = ntohl(param_val); - printf("id: %d val: %d\n", param_id, param_val); + //printf("id: %d val: %d\n", param_id, param_val); switch (param_id) { case H2_SETTINGS_HEADER_TABLE_SIZE: @@ -162,6 +162,160 @@ static int process_window_update_frame(antd_request_t* rq, antd_h2_frame_header_ return H2_NO_ERROR; } +static void antd_h2_error(void* source,int stream_id, int error_code) +{ + // send error frame + antd_h2_conn_t* conn = H2_CONN(source); + if(!conn) return; + antd_h2_frame_header_t header; + header.identifier = stream_id; + //header.type = stat; + header.flags = 0; + header.length = 8; + uint8_t error_body[8]; + int tmp = htonl(conn->last_stream_id); + memcpy(error_body, &tmp , 4 ); + tmp = htonl(error_code); + memcpy(error_body + 4, &tmp ,4); + header.type = H2_FRM_RST_STREAM; + if(stream_id == 0) + header.type = H2_FRM_GOAWAY; + antd_h2_send_frame(source,&header,error_body); +} + +antd_h2_stream_t* antd_h2_init_stream(int id, int wsz) +{ + antd_h2_stream_t* stream = (antd_h2_stream_t*) malloc(sizeof(antd_h2_stream_t)); + if(!stream) return NULL; + stream->id = id; + stream->win_sz = wsz; + stream->state = H2_STR_IDLE; + stream->stdin = ALLOC_QUEUE_ROOT(); + stream->stdout = ALLOC_QUEUE_ROOT(); + //stream->flags = 0; + stream->dependency = 0; + stream->weight = 255; + return stream; +} + +antd_request_t* antd_h2_request_init(antd_request_t* rq, antd_h2_stream_t* stream) +{ + antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); + antd_request_t* h2rq = (antd_request_t*)malloc(sizeof(*h2rq)); + h2rq->client = client; + h2rq->request = dict(); + client->zstream = NULL; + client->z_level = ANTD_CNONE; + + dictionary_t h2xheader = dict(); + dictionary_t xheader = (dictionary_t)dvalue(rq->request,"REQUEST_HEADER"); + + dput(h2rq->request, "REQUEST_HEADER", h2xheader); + dput(h2rq->request, "REQUEST_DATA", dict()); + dput_static(h2xheader, "SERVER_PORT", dvalue(xheader,"SERVER_PORT")); + dput_static(h2xheader, "SERVER_WWW_ROOT", dvalue(xheader,"SERVER_WWW_ROOT")); + dput_static(h2xheader, "REMOTE_ADDR", dvalue(xheader,"REMOTE_ADDR")); + client->id = stream->id; + time(&client->last_io); + client->stream = stream; + client->flags = CLIENT_FL_ACCEPTED | CLIENT_FL_H2_STREAM; + return h2rq; +} + +static void* antd_h2_stream_handle(void* data) +{ + antd_request_t* rq = (antd_request_t*) data; + antd_h2_stream_t* stream = (antd_h2_stream_t*) rq->client->stream; + // transition state here + // TODO: next day + return NULL; +} + + +static int process_header_frame(antd_request_t* rq, antd_h2_frame_header_t* frame_h) +{ + if(frame_h->length == 0 || frame_h->identifier == 0) + { + return H2_PROTOCOL_ERROR; + } + uint8_t* data = (uint8_t*) malloc(frame_h->length); + if(!data) + { + return H2_INTERNAL_ERROR; + } + if(antd_recv(rq->client,data,frame_h->length) != (int)frame_h->length) + { + free(data); + return H2_PROTOCOL_ERROR; + } + // now parse the stream + antd_h2_conn_t* conn = H2_CONN(rq); + if(!conn) + { + free(data); + return H2_INTERNAL_ERROR; + } + int is_new_stream = 0; + antd_h2_stream_t* stream = antd_h2_get_stream(conn->streams,frame_h->identifier); + if(stream == NULL) + { + stream = antd_h2_init_stream(frame_h->identifier,conn->settings.init_win_sz); + antd_h2_add_stream(conn->streams,stream); + is_new_stream = 1; + } + if( stream->state == H2_STR_IDLE || stream->state == H2_STR_REV_LOC || stream->state == H2_STR_OPEN || stream->state == H2_STR_HALF_CLOSED_REM ) + { + antd_h2_frame_t* frame = (antd_h2_frame_t*) malloc(sizeof(antd_h2_frame_t)); + frame->header = *frame_h; + frame->pageload = data; + h2_stream_io_put(stream,frame); + if(is_new_stream) + { + // TODO create new request + // just dump the scheduler when we have a connection + antd_schedule_task( antd_create_task(antd_h2_stream_handle, antd_h2_request_init(rq, stream) , NULL, time(NULL))); + } + return H2_NO_ERROR; + } + else + { + free(data); + return H2_PROTOCOL_ERROR; + } +} + +antd_h2_frame_t* h2_streamio_get(struct queue_root* io) +{ + struct queue_head* head = queue_get(io); + if(!head) return NULL; + antd_h2_frame_t* frame = (antd_h2_frame_t*)head->data; + free(head); + return frame; +} + +void h2_stream_io_put(antd_h2_stream_t* stream, antd_h2_frame_t* frame) +{ + struct queue_head* head = (struct queue_head*) malloc(sizeof(struct queue_head)); + INIT_QUEUE_HEAD(head); + head->data = (void*)frame; + if(frame->header.identifier % 2 == 0) + { + queue_put(head, stream->stdin); + } + else + { + queue_put(head, stream->stdout); + } +} + + +void antd_h2_destroy_frame(antd_h2_frame_t* frame) +{ + if(frame->pageload) + free(frame->pageload); + free(frame); +} + static int process_frame(void* source, antd_h2_frame_header_t* frame_h) { int stat; @@ -173,6 +327,9 @@ static int process_frame(void* source, antd_h2_frame_header_t* frame_h) case H2_FRM_WINDOW_UPDATE: stat = process_window_update_frame(source,frame_h); break; + case H2_FRM_HEADER: + stat = process_header_frame(source, frame_h); + break; default: printf("Frame: %d, length: %d id: %d\n", frame_h->type, frame_h->length, frame_h->identifier); stat = H2_IGNORED; @@ -180,25 +337,7 @@ static int process_frame(void* source, antd_h2_frame_header_t* frame_h) if(stat == H2_NO_ERROR || stat == H2_IGNORED) return stat; - antd_h2_conn_t* conn = H2_CONN(source); - if(conn) - { - // send error frame - antd_h2_frame_header_t header; - header.identifier = frame_h->identifier; - //header.type = stat; - header.flags = 0; - header.length = 8; - uint8_t error_body[8]; - int tmp = htonl(conn->last_stream_id); - memcpy(error_body, &tmp , 4 ); - tmp = htonl(stat); - memcpy(error_body + 4, &tmp ,4); - header.type = H2_FRM_RST_STREAM; - if(frame_h->identifier == 0) - header.type = H2_FRM_GOAWAY; - antd_h2_send_frame(source,&header,error_body); - } + antd_h2_error(source, frame_h->identifier,stat); return stat; } @@ -213,15 +352,17 @@ void* antd_h2_preface_ck(void* data) { // TODO servers MUST treat an invalid connection preface as a // connection error (Section 5.4.1) of type PROTOCOL_ERROR - ERROR("Unable to read preface for client %d: [%s]",rq->client->sock,buf); + ERROR("Unable to read preface for client %d: [%s]",rq->client->id,buf); + antd_h2_error(data,0, H2_PROTOCOL_ERROR); return antd_empty_task((void *)rq,rq->client->last_io); } buf[24] = '\0'; if(strcmp(buf, H2_CONN_PREFACE) != 0) { - ERROR("Connection preface is not correct for client %d: [%s]",rq->client->sock,buf); + ERROR("Connection preface is not correct for client %d: [%s]",rq->client->id,buf); // TODO servers MUST treat an invalid connection preface as a // connection error (Section 5.4.1) of type PROTOCOL_ERROR + antd_h2_error(data,0, H2_PROTOCOL_ERROR); return antd_empty_task((void *)rq, rq->client->last_io); } // read the setting frame @@ -230,8 +371,8 @@ void* antd_h2_preface_ck(void* data) // TODO: frame error // // send go away with PROTOCOL_ERROR - printf("error reading setting frame\n"); - ERROR("Unable to read setting frame from client %d",rq->client->sock); + ERROR("Unable to read setting frame from client %d",rq->client->id); + antd_h2_error(data,0, H2_PROTOCOL_ERROR); return antd_empty_task((void *)rq, rq->client->last_io); } // create a connection @@ -248,18 +389,15 @@ void* antd_h2_preface_ck(void* data) header.identifier = 0; if(antd_h2_send_frame(rq, &header,NULL)) { - printf("frame sent\n"); return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); } else { - printf("cannot send frame\n"); return antd_empty_task(data, rq->client->last_io); } } else { - printf("Error process frame %d\n", stat); return antd_empty_task(data, rq->client->last_io); } } @@ -267,14 +405,20 @@ void* antd_h2_preface_ck(void* data) void* antd_h2_handle(void* data) { antd_request_t* rq = (antd_request_t*) data; - antd_task_t* task; + antd_task_t* task = NULL; if(rq->client->flags & CLIENT_FL_READABLE) { - antd_h2_read(data); + if(!antd_h2_read(data)) + { + return antd_empty_task(data, rq->client->last_io); + } } if(rq->client->flags & CLIENT_FL_WRITABLE) { - antd_h2_write(data); + if(!antd_h2_write(data)) + { + return antd_empty_task(data, rq->client->last_io); + } } task = antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); @@ -285,30 +429,34 @@ void* antd_h2_handle(void* data) -void* antd_h2_read(void* data) +int antd_h2_read(void* data) { antd_h2_frame_header_t frame_h; antd_request_t* rq = (antd_request_t*) data; if(!antd_h2_read_frame_header(rq->client, &frame_h)) { - // TODO: frame error // send goaway frame - ERROR("Unable to read frame from client %d",rq->client->sock); - return antd_empty_task(data, rq->client->last_io); + ERROR("Unable to read frame from client %d",rq->client->id); + antd_h2_error(data, 0, H2_PROTOCOL_ERROR); + return 0; } - process_frame(data, &frame_h); - return antd_empty_task(data, rq->client->last_io); + int stat = process_frame(data, &frame_h); + if(stat == H2_NO_ERROR || stat == H2_IGNORED) + return 1; + + return 0; } -void* antd_h2_write(void* data) +int antd_h2_write(void* data) { - antd_request_t* rq = (antd_request_t*) data; + UNUSED(data); + //antd_request_t* rq = (antd_request_t*) data; //printf("write task\n"); - return antd_empty_task(data, rq->client->last_io); + return 1; } antd_h2_conn_t* antd_h2_open_conn() { - antd_h2_conn_t* conn = (antd_h2_conn_t*) malloc(sizeof(conn)); + antd_h2_conn_t* conn = (antd_h2_conn_t*) malloc(sizeof(antd_h2_conn_t)); if(! conn) return NULL; @@ -321,7 +469,11 @@ antd_h2_conn_t* antd_h2_open_conn() conn->win_sz = conn->settings.init_win_sz; conn->last_stream_id = 0; conn->streams = (antd_h2_stream_list_t*) malloc(2*sizeof(antd_h2_stream_list_t)); - + if(conn->streams) + { + conn->streams[0] = NULL; + conn->streams[1] = NULL; + } return conn; } @@ -333,8 +485,8 @@ void antd_h2_close_conn(antd_h2_conn_t* conn) if(conn->streams) { - antd_h2_close_all_streams(conn->streams[0]); - antd_h2_close_all_streams(conn->streams[1]); + antd_h2_close_all_streams(&conn->streams[0]); + antd_h2_close_all_streams(&conn->streams[1]); free(conn->streams); } free(conn); @@ -398,19 +550,31 @@ antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t* streams, int id) void antd_h2_close_stream(antd_h2_stream_t* stream) { if(!stream) return; - if(stream->stdin) free(stream->stdin); - if(stream->stdout) free(stream->stdout); - free(stream); + // TODO empty the queue + if(stream->stdin) + { + queue_empty(stream->stdin, (void (*)(void*))antd_h2_destroy_frame); + free(stream->stdin); + } + if(stream->stdout) + { + queue_empty(stream->stdout, (void (*)(void*))antd_h2_destroy_frame); + free(stream->stdout); + } + //free(stream); } -void antd_h2_close_all_streams(antd_h2_stream_list_t streams) +void antd_h2_close_all_streams(antd_h2_stream_list_t* streams) { antd_h2_stream_list_t it; - for(it = streams; it != NULL; it = it->next) + while((*streams) != NULL) { + it = *streams; + (*streams) = (*streams)->next; if(it->stream) { antd_h2_close_stream(it->stream); + free(it->stream); } free(it); } @@ -436,6 +600,7 @@ void antd_h2_del_stream(antd_h2_stream_list_t* streams, int id) it = streams[idx]; streams[idx] = it->next; antd_h2_close_stream(it->stream); + free(it->stream); free(it); return; } @@ -448,6 +613,7 @@ void antd_h2_del_stream(antd_h2_stream_list_t* streams, int id) it->next = np->next; np->next = NULL; antd_h2_close_stream(np->stream); + free(np->stream); free(np); return; } diff --git a/lib/h2.h b/lib/h2.h index 0e5699e..fc66f1b 100644 --- a/lib/h2.h +++ b/lib/h2.h @@ -117,6 +117,17 @@ field value other than 0 MUST be treated as a connection error #define H2_SETTINGS_MAX_HEADER_LIST_SIZE 0x6 +// stream state +typedef enum { + H2_STR_IDLE, + H2_STR_OPEN, + H2_STR_REV_LOC, + H2_STR_REV_REM, + H2_STR_HALF_CLOSED_LOC, + H2_STR_HALF_CLOSED_REM, + H2_STR_CLOSED +} antd_h2_stream_state_t; + typedef struct{ uint32_t header_table_sz; uint32_t enable_push; @@ -150,6 +161,10 @@ typedef struct { struct queue_root* stdin; struct queue_root* stdout; int win_sz; + antd_h2_stream_state_t state; + //uint8_t flags; + int dependency; + uint8_t weight; int id; } antd_h2_stream_t; @@ -167,20 +182,34 @@ typedef struct { unsigned int identifier; } antd_h2_frame_header_t; +typedef struct { + antd_h2_frame_header_t header; + uint8_t* pageload; +} antd_h2_frame_t; + + +/*Frame utilities functions*/ +void antd_h2_destroy_frame(antd_h2_frame_t*); + /*stream utilities functions*/ +antd_h2_stream_t* antd_h2_init_stream(int id, int wsz); void antd_h2_close_stream(antd_h2_stream_t* stream); void antd_h2_add_stream(antd_h2_stream_list_t*, antd_h2_stream_t*); antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t*, int); void antd_h2_del_stream(antd_h2_stream_list_t*, int); -void antd_h2_close_all_streams(antd_h2_stream_list_t); +void antd_h2_close_all_streams(antd_h2_stream_list_t*); void antd_h2_update_streams_win_sz(antd_h2_stream_list_t streams, int offset); +void h2_stream_io_put(antd_h2_stream_t*, antd_h2_frame_t*); +antd_h2_frame_t* h2_streamio_get(struct queue_root*); +antd_request_t* antd_h2_request_init(antd_request_t*, antd_h2_stream_t*); + /*Connection utilities funtions*/ antd_h2_conn_t* antd_h2_open_conn(); void antd_h2_close_conn(antd_h2_conn_t*); -void* antd_h2_read(void* rq); -void* antd_h2_write(void* rq); +int antd_h2_read(void* rq); +int antd_h2_write(void* rq); void* antd_h2_preface_ck(void* rq); void* antd_h2_handle(void* rq); int antd_h2_send_frame(antd_request_t*, antd_h2_frame_header_t*, uint8_t*); diff --git a/lib/handle.c b/lib/handle.c index 4739038..96179ad 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -104,6 +104,11 @@ void plugindir(char* dest) UNUSED(dest); } +void antd_schedule_task(antd_task_t* task) +{ + UNUSED(task); +} + const char* get_status_str(int stat) { switch(stat) @@ -326,7 +331,7 @@ int antd_send(void *src, const void* data_in, int len_in) int count; #ifdef USE_OPENSSL - if(source->ssl) + if(source->stream) { //LOG("SSL WRITE\n"); //ret = SSL_write((SSL*) source->ssl, data, len); @@ -339,8 +344,8 @@ int antd_send(void *src, const void* data_in, int len_in) { // clear the error queue ERR_clear_error(); - count = SSL_write (source->ssl, ptr+written, writelen); - int err = SSL_get_error(source->ssl, count); + count = SSL_write (source->stream, ptr+written, writelen); + int err = SSL_get_error(source->stream, count); if (count > 0) { written += count; @@ -378,7 +383,7 @@ int antd_send(void *src, const void* data_in, int len_in) // no data available right now, wait a few seconds in case new data arrives... //printf("SSL_ERROR_WANT_READ\n"); - int sock = SSL_get_rfd(source->ssl); + int sock = SSL_get_rfd(source->stream); FD_ZERO(&fds); FD_SET(sock, &fds); @@ -398,7 +403,7 @@ int antd_send(void *src, const void* data_in, int len_in) { // socket not writable right now, wait a few seconds and try again... //printf("SSL_ERROR_WANT_WRITE \n"); - int sock = SSL_get_wfd(source->ssl); + int sock = SSL_get_wfd(source->stream); FD_ZERO(&fds); FD_SET(sock, &fds); @@ -437,7 +442,7 @@ int antd_send(void *src, const void* data_in, int len_in) written = 0; while (writelen > 0) { - count = send(source->sock, ptr+written, writelen, 0); + count = send(source->id, ptr+written, writelen, 0); if (count > 0) { written += count; @@ -468,7 +473,7 @@ int antd_recv(void *src, void* data, int len) int readlen=0; antd_client_t * source = (antd_client_t *) src; #ifdef USE_OPENSSL - if(source->ssl) + if(source->stream) { ptr = (char* )data; readlen = len > BUFFLEN?BUFFLEN:len; @@ -478,8 +483,8 @@ int antd_recv(void *src, void* data, int len) while (readlen > 0 )//&& source->attempt < MAX_ATTEMPT { ERR_clear_error(); - received = SSL_read (source->ssl, ptr+read, readlen); - int err = SSL_get_error(source->ssl, received); + received = SSL_read (source->stream, ptr+read, readlen); + int err = SSL_get_error(source->stream, received); if (received > 0) { read += received; @@ -518,7 +523,7 @@ int antd_recv(void *src, void* data, int len) // no data available right now, wait a few seconds in case new data arrives... //printf("SSL_ERROR_WANT_READ\n"); - int sock = SSL_get_rfd(source->ssl); + int sock = SSL_get_rfd(source->stream); FD_ZERO(&fds); FD_SET(sock, &fds); @@ -538,7 +543,7 @@ int antd_recv(void *src, void* data, int len) { // socket not writable right now, wait a few seconds and try again... //printf("SSL_ERROR_WANT_WRITE \n"); - int sock = SSL_get_wfd(source->ssl); + int sock = SSL_get_wfd(source->stream); FD_ZERO(&fds); FD_SET(sock, &fds); @@ -595,7 +600,7 @@ int antd_recv(void *src, void* data, int len) read = 0; while (readlen > 0 ) { - received = recv(((int) source->sock), ptr+read, readlen, 0); + received = recv(((int) source->id), ptr+read, readlen, 0); //LOG("Read : %c\n", *ptr); if (received > 0) { @@ -656,24 +661,24 @@ int antd_close(void* src) } #endif #ifdef USE_OPENSSL - if(source->ssl){ + if(source->stream){ //printf("SSL:Shutdown ssl\n"); //SSL_shutdown((SSL*) source->ssl); - SSL_set_shutdown((SSL*) source->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN); + SSL_set_shutdown((SSL*) source->stream, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN); //printf("SSL:Free ssl\n"); - SSL_free((SSL*) source->ssl); + SSL_free((SSL*) source->stream); //EVP_cleanup(); //ENGINE_cleanup(); CRYPTO_cleanup_all_ex_data(); ERR_remove_state(0); ERR_free_strings(); - source->ssl = NULL; + source->stream = NULL; //LOG("Freeing SSL\n"); } #endif //printf("Close sock %d\n", source->sock); - int ret = close(source->sock); + int ret = close(source->id); free(src); src = NULL; return ret; diff --git a/lib/handle.h b/lib/handle.h index 8054d40..c33065b 100644 --- a/lib/handle.h +++ b/lib/handle.h @@ -44,6 +44,7 @@ typedef enum {ANTD_CGZ, ANTD_CDEFL, ANTD_CNONE} antd_compress_t; #define CLIENT_FL_HTTP_1_1 0x04 #define CLIENT_FL_READABLE 0x08 #define CLIENT_FL_WRITABLE 0x10 +#define CLIENT_FL_H2_STREAM 0x20 typedef struct { unsigned int port; @@ -54,8 +55,8 @@ typedef struct { } port_config_t; typedef struct{ - int sock; - void* ssl; + int id; + void* stream; uint8_t flags; time_t last_io; // compress option @@ -119,6 +120,7 @@ void __attribute__((weak)) dbdir(char* dest); void __attribute__((weak)) tmpdir(char* dest); void __attribute__((weak)) plugindir(char* dest); int __attribute__((weak)) compressable(char* ctype); +void __attribute__((weak)) antd_schedule_task(antd_task_t*); void set_nonblock(int socket); //void set_block(int socket); diff --git a/lib/queue.c b/lib/queue.c index 11a6865..20c4bd4 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -50,6 +50,24 @@ void queue_put(struct queue_head *new, } } +int queue_readable(struct queue_root *root) +{ + return root->out_queue != NULL && root->out_queue != QUEUE_POISON1; +} + +void queue_empty(struct queue_root * root, void (* fn)(void*)) +{ + struct queue_head* head = NULL; + while((head = queue_get(root)) != NULL) + { + if(fn) + { + fn(head->data); + } + free(head); + } +} + struct queue_head *queue_get(struct queue_root *root) { // pthread_spin_lock(&root->lock); diff --git a/lib/queue.h b/lib/queue.h index f920d7c..f398001 100644 --- a/lib/queue.h +++ b/lib/queue.h @@ -14,7 +14,8 @@ struct queue_root *ALLOC_QUEUE_ROOT(); void INIT_QUEUE_HEAD(struct queue_head *head); void queue_put(struct queue_head *,struct queue_root *); - +int queue_readable(struct queue_root *); +void queue_empty(struct queue_root *, void (*)(void*) ); struct queue_head *queue_get(struct queue_root *root); #endif // QUEUE_H diff --git a/lib/ws.c b/lib/ws.c index 3b2ba0f..df4c91c 100644 --- a/lib/ws.c +++ b/lib/ws.c @@ -402,7 +402,7 @@ int ws_client_connect(ws_client_t* wsclient, port_config_t pcnf) return -1; } // will be free - wsclient->antdsock->sock = sock; + wsclient->antdsock->id = sock; wsclient->antdsock->flags = 0; wsclient->antdsock->last_io = time(NULL); wsclient->antdsock->zstream = NULL; @@ -486,19 +486,19 @@ int ws_client_connect(ws_client_t* wsclient, port_config_t pcnf) SSL_CTX_set_verify(wsclient->ssl_ctx, SSL_VERIFY_NONE, NULL); } - wsclient->antdsock->ssl = (void*)SSL_new(wsclient->ssl_ctx); - if(!wsclient->antdsock->ssl) + wsclient->antdsock->stream = (void*)SSL_new(wsclient->ssl_ctx); + if(!wsclient->antdsock->stream) { ssl_err = ERR_get_error(); ERROR("SSL_new: %s", ERR_error_string(ssl_err, NULL)); return -1; } - SSL_set_fd((SSL*)wsclient->antdsock->ssl, wsclient->antdsock->sock); + SSL_set_fd((SSL*)wsclient->antdsock->stream, wsclient->antdsock->id); int stat, ret; ERR_clear_error(); - while( (ret = SSL_connect(wsclient->antdsock->ssl)) <= 0) + while( (ret = SSL_connect(wsclient->antdsock->stream)) <= 0) { - stat = SSL_get_error(wsclient->antdsock->ssl, ret); + stat = SSL_get_error(wsclient->antdsock->stream, ret); switch (stat) { case SSL_ERROR_WANT_READ: