This commit is contained in:
lxsang 2020-01-29 17:09:16 +01:00
parent 894f6d0f31
commit cbe7c80a8a
9 changed files with 325 additions and 98 deletions

View File

@ -336,20 +336,20 @@ void *accept_request(void *data)
// first verify if the socket is ready // first verify if the socket is ready
antd_client_t *client = (antd_client_t *)rq->client; antd_client_t *client = (antd_client_t *)rq->client;
FD_ZERO(&read_flags); FD_ZERO(&read_flags);
FD_SET(rq->client->sock, &read_flags); FD_SET(rq->client->id, &read_flags);
FD_ZERO(&write_flags); FD_ZERO(&write_flags);
FD_SET(rq->client->sock, &write_flags); FD_SET(rq->client->id, &write_flags);
struct timeval timeout; struct timeval timeout;
timeout.tv_sec = 0; timeout.tv_sec = 0;
timeout.tv_usec = 500; timeout.tv_usec = 500;
// select // select
int sel = select(client->sock + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); int sel = select(client->id + 1, &read_flags, &write_flags, (fd_set *)0, &timeout);
if (sel == -1) if (sel == -1)
{ {
antd_error(rq->client, 400, "Bad request"); antd_error(rq->client, 400, "Bad request");
return task; return task;
} }
if (sel == 0 || (!FD_ISSET(client->sock, &read_flags) && !FD_ISSET(client->sock, &write_flags))) if (sel == 0 || (!FD_ISSET(client->id, &read_flags) && !FD_ISSET(client->id, &write_flags)))
{ {
task->handle = accept_request; task->handle = accept_request;
return task; return task;
@ -357,12 +357,12 @@ void *accept_request(void *data)
// perform the ssl handshake if enabled // perform the ssl handshake if enabled
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
int ret = -1, stat; int ret = -1, stat;
if (client->ssl && !(client->flags & CLIENT_FL_ACCEPTED) ) if (client->stream && !(client->flags & CLIENT_FL_ACCEPTED) )
{ {
//LOG("Atttempt %d\n", client->attempt); //LOG("Atttempt %d\n", client->attempt);
if (SSL_accept((SSL *)client->ssl) == -1) if (SSL_accept((SSL *)client->stream) == -1)
{ {
stat = SSL_get_error((SSL *)client->ssl, ret); stat = SSL_get_error((SSL *)client->stream, ret);
switch (stat) switch (stat)
{ {
case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ:
@ -385,7 +385,7 @@ void *accept_request(void *data)
} }
else else
{ {
if (!FD_ISSET(client->sock, &read_flags)) if (!FD_ISSET(client->id, &read_flags))
{ {
client->flags |= CLIENT_FL_ACCEPTED; client->flags |= CLIENT_FL_ACCEPTED;
task->handle = accept_request; task->handle = accept_request;

32
httpd.c
View File

@ -6,6 +6,12 @@
static antd_scheduler_t scheduler; static antd_scheduler_t scheduler;
void antd_schedule_task(antd_task_t* task)
{
antd_add_task(&scheduler,task);
}
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
// define the cipher suit used // define the cipher suit used
@ -184,15 +190,15 @@ static int is_task_ready(antd_task_t* task)
fd_set read_flags, write_flags; fd_set read_flags, write_flags;
struct timeval timeout; struct timeval timeout;
FD_ZERO(&read_flags); FD_ZERO(&read_flags);
FD_SET(rq->client->sock, &read_flags); FD_SET(rq->client->id, &read_flags);
FD_ZERO(&write_flags); FD_ZERO(&write_flags);
FD_SET(rq->client->sock, &write_flags); FD_SET(rq->client->id, &write_flags);
timeout.tv_sec = 0; timeout.tv_sec = 0;
timeout.tv_usec = 0; timeout.tv_usec = 0;
int sel = select(rq->client->sock + 1, &read_flags, &write_flags, (fd_set *)0, &timeout); int sel = select(rq->client->id + 1, &read_flags, &write_flags, (fd_set *)0, &timeout);
if(sel > 0 && (FD_ISSET(rq->client->sock, &read_flags)|| FD_ISSET(rq->client->sock, &write_flags))) if(sel > 0 && (FD_ISSET(rq->client->id, &read_flags)|| FD_ISSET(rq->client->id, &write_flags)))
{ {
if(FD_ISSET(rq->client->sock, &read_flags)) if(FD_ISSET(rq->client->id, &read_flags))
{ {
rq->client->flags |= CLIENT_FL_READABLE; rq->client->flags |= CLIENT_FL_READABLE;
} }
@ -201,7 +207,7 @@ static int is_task_ready(antd_task_t* task)
rq->client->flags &= ~CLIENT_FL_READABLE; rq->client->flags &= ~CLIENT_FL_READABLE;
} }
if(FD_ISSET(rq->client->sock, &write_flags)) if(FD_ISSET(rq->client->id, &write_flags))
{ {
rq->client->flags |= CLIENT_FL_WRITABLE; rq->client->flags |= CLIENT_FL_WRITABLE;
} }
@ -363,22 +369,22 @@ int main(int argc, char* argv[])
if (setsockopt (client_sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout,sizeof(timeout)) < 0) if (setsockopt (client_sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout,sizeof(timeout)) < 0)
perror("setsockopt failed\n"); perror("setsockopt failed\n");
*/ */
client->sock = client_sock; client->id = client_sock;
time(&client->last_io); time(&client->last_io);
client->ssl = NULL; client->stream = NULL;
// default selected protocol is http/1.1 // default selected protocol is http/1.1
client->flags = CLIENT_FL_HTTP_1_1; client->flags = CLIENT_FL_HTTP_1_1;
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
if(pcnf->usessl == 1) if(pcnf->usessl == 1)
{ {
client->ssl = (void*)SSL_new(ctx); client->stream = (void*)SSL_new(ctx);
if(!client->ssl) continue; if(!client->stream) continue;
SSL_set_fd((SSL*)client->ssl, client->sock); SSL_set_fd((SSL*)client->stream, client->id);
// this can be used in the protocol select callback to // this can be used in the protocol select callback to
// set the protocol selected by the server // set the protocol selected by the server
if(!SSL_set_ex_data((SSL*)client->ssl, client->sock, client)) if(!SSL_set_ex_data((SSL*)client->stream, client->id, client))
{ {
ERROR("Cannot set ex data to ssl client:%d", client->sock); ERROR("Cannot set ex data to ssl client:%d", client->id);
} }
} }
#endif #endif

262
lib/h2.c
View File

@ -75,7 +75,7 @@ static int process_setting_frame(antd_request_t* rq, antd_h2_frame_header_t* fra
ptr += 2; ptr += 2;
memcpy(&param_val,ptr,4); memcpy(&param_val,ptr,4);
param_val = ntohl(param_val); param_val = ntohl(param_val);
printf("id: %d val: %d\n", param_id, param_val); //printf("id: %d val: %d\n", param_id, param_val);
switch (param_id) switch (param_id)
{ {
case H2_SETTINGS_HEADER_TABLE_SIZE: case H2_SETTINGS_HEADER_TABLE_SIZE:
@ -162,6 +162,160 @@ static int process_window_update_frame(antd_request_t* rq, antd_h2_frame_header_
return H2_NO_ERROR; return H2_NO_ERROR;
} }
static void antd_h2_error(void* source,int stream_id, int error_code)
{
// send error frame
antd_h2_conn_t* conn = H2_CONN(source);
if(!conn) return;
antd_h2_frame_header_t header;
header.identifier = stream_id;
//header.type = stat;
header.flags = 0;
header.length = 8;
uint8_t error_body[8];
int tmp = htonl(conn->last_stream_id);
memcpy(error_body, &tmp , 4 );
tmp = htonl(error_code);
memcpy(error_body + 4, &tmp ,4);
header.type = H2_FRM_RST_STREAM;
if(stream_id == 0)
header.type = H2_FRM_GOAWAY;
antd_h2_send_frame(source,&header,error_body);
}
antd_h2_stream_t* antd_h2_init_stream(int id, int wsz)
{
antd_h2_stream_t* stream = (antd_h2_stream_t*) malloc(sizeof(antd_h2_stream_t));
if(!stream) return NULL;
stream->id = id;
stream->win_sz = wsz;
stream->state = H2_STR_IDLE;
stream->stdin = ALLOC_QUEUE_ROOT();
stream->stdout = ALLOC_QUEUE_ROOT();
//stream->flags = 0;
stream->dependency = 0;
stream->weight = 255;
return stream;
}
antd_request_t* antd_h2_request_init(antd_request_t* rq, antd_h2_stream_t* stream)
{
antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t));
antd_request_t* h2rq = (antd_request_t*)malloc(sizeof(*h2rq));
h2rq->client = client;
h2rq->request = dict();
client->zstream = NULL;
client->z_level = ANTD_CNONE;
dictionary_t h2xheader = dict();
dictionary_t xheader = (dictionary_t)dvalue(rq->request,"REQUEST_HEADER");
dput(h2rq->request, "REQUEST_HEADER", h2xheader);
dput(h2rq->request, "REQUEST_DATA", dict());
dput_static(h2xheader, "SERVER_PORT", dvalue(xheader,"SERVER_PORT"));
dput_static(h2xheader, "SERVER_WWW_ROOT", dvalue(xheader,"SERVER_WWW_ROOT"));
dput_static(h2xheader, "REMOTE_ADDR", dvalue(xheader,"REMOTE_ADDR"));
client->id = stream->id;
time(&client->last_io);
client->stream = stream;
client->flags = CLIENT_FL_ACCEPTED | CLIENT_FL_H2_STREAM;
return h2rq;
}
static void* antd_h2_stream_handle(void* data)
{
antd_request_t* rq = (antd_request_t*) data;
antd_h2_stream_t* stream = (antd_h2_stream_t*) rq->client->stream;
// transition state here
// TODO: next day
return NULL;
}
static int process_header_frame(antd_request_t* rq, antd_h2_frame_header_t* frame_h)
{
if(frame_h->length == 0 || frame_h->identifier == 0)
{
return H2_PROTOCOL_ERROR;
}
uint8_t* data = (uint8_t*) malloc(frame_h->length);
if(!data)
{
return H2_INTERNAL_ERROR;
}
if(antd_recv(rq->client,data,frame_h->length) != (int)frame_h->length)
{
free(data);
return H2_PROTOCOL_ERROR;
}
// now parse the stream
antd_h2_conn_t* conn = H2_CONN(rq);
if(!conn)
{
free(data);
return H2_INTERNAL_ERROR;
}
int is_new_stream = 0;
antd_h2_stream_t* stream = antd_h2_get_stream(conn->streams,frame_h->identifier);
if(stream == NULL)
{
stream = antd_h2_init_stream(frame_h->identifier,conn->settings.init_win_sz);
antd_h2_add_stream(conn->streams,stream);
is_new_stream = 1;
}
if( stream->state == H2_STR_IDLE || stream->state == H2_STR_REV_LOC || stream->state == H2_STR_OPEN || stream->state == H2_STR_HALF_CLOSED_REM )
{
antd_h2_frame_t* frame = (antd_h2_frame_t*) malloc(sizeof(antd_h2_frame_t));
frame->header = *frame_h;
frame->pageload = data;
h2_stream_io_put(stream,frame);
if(is_new_stream)
{
// TODO create new request
// just dump the scheduler when we have a connection
antd_schedule_task( antd_create_task(antd_h2_stream_handle, antd_h2_request_init(rq, stream) , NULL, time(NULL)));
}
return H2_NO_ERROR;
}
else
{
free(data);
return H2_PROTOCOL_ERROR;
}
}
antd_h2_frame_t* h2_streamio_get(struct queue_root* io)
{
struct queue_head* head = queue_get(io);
if(!head) return NULL;
antd_h2_frame_t* frame = (antd_h2_frame_t*)head->data;
free(head);
return frame;
}
void h2_stream_io_put(antd_h2_stream_t* stream, antd_h2_frame_t* frame)
{
struct queue_head* head = (struct queue_head*) malloc(sizeof(struct queue_head));
INIT_QUEUE_HEAD(head);
head->data = (void*)frame;
if(frame->header.identifier % 2 == 0)
{
queue_put(head, stream->stdin);
}
else
{
queue_put(head, stream->stdout);
}
}
void antd_h2_destroy_frame(antd_h2_frame_t* frame)
{
if(frame->pageload)
free(frame->pageload);
free(frame);
}
static int process_frame(void* source, antd_h2_frame_header_t* frame_h) static int process_frame(void* source, antd_h2_frame_header_t* frame_h)
{ {
int stat; int stat;
@ -173,6 +327,9 @@ static int process_frame(void* source, antd_h2_frame_header_t* frame_h)
case H2_FRM_WINDOW_UPDATE: case H2_FRM_WINDOW_UPDATE:
stat = process_window_update_frame(source,frame_h); stat = process_window_update_frame(source,frame_h);
break; break;
case H2_FRM_HEADER:
stat = process_header_frame(source, frame_h);
break;
default: default:
printf("Frame: %d, length: %d id: %d\n", frame_h->type, frame_h->length, frame_h->identifier); printf("Frame: %d, length: %d id: %d\n", frame_h->type, frame_h->length, frame_h->identifier);
stat = H2_IGNORED; stat = H2_IGNORED;
@ -180,25 +337,7 @@ static int process_frame(void* source, antd_h2_frame_header_t* frame_h)
if(stat == H2_NO_ERROR || stat == H2_IGNORED) if(stat == H2_NO_ERROR || stat == H2_IGNORED)
return stat; return stat;
antd_h2_conn_t* conn = H2_CONN(source); antd_h2_error(source, frame_h->identifier,stat);
if(conn)
{
// send error frame
antd_h2_frame_header_t header;
header.identifier = frame_h->identifier;
//header.type = stat;
header.flags = 0;
header.length = 8;
uint8_t error_body[8];
int tmp = htonl(conn->last_stream_id);
memcpy(error_body, &tmp , 4 );
tmp = htonl(stat);
memcpy(error_body + 4, &tmp ,4);
header.type = H2_FRM_RST_STREAM;
if(frame_h->identifier == 0)
header.type = H2_FRM_GOAWAY;
antd_h2_send_frame(source,&header,error_body);
}
return stat; return stat;
} }
@ -213,15 +352,17 @@ void* antd_h2_preface_ck(void* data)
{ {
// TODO servers MUST treat an invalid connection preface as a // TODO servers MUST treat an invalid connection preface as a
// connection error (Section 5.4.1) of type PROTOCOL_ERROR // connection error (Section 5.4.1) of type PROTOCOL_ERROR
ERROR("Unable to read preface for client %d: [%s]",rq->client->sock,buf); ERROR("Unable to read preface for client %d: [%s]",rq->client->id,buf);
antd_h2_error(data,0, H2_PROTOCOL_ERROR);
return antd_empty_task((void *)rq,rq->client->last_io); return antd_empty_task((void *)rq,rq->client->last_io);
} }
buf[24] = '\0'; buf[24] = '\0';
if(strcmp(buf, H2_CONN_PREFACE) != 0) if(strcmp(buf, H2_CONN_PREFACE) != 0)
{ {
ERROR("Connection preface is not correct for client %d: [%s]",rq->client->sock,buf); ERROR("Connection preface is not correct for client %d: [%s]",rq->client->id,buf);
// TODO servers MUST treat an invalid connection preface as a // TODO servers MUST treat an invalid connection preface as a
// connection error (Section 5.4.1) of type PROTOCOL_ERROR // connection error (Section 5.4.1) of type PROTOCOL_ERROR
antd_h2_error(data,0, H2_PROTOCOL_ERROR);
return antd_empty_task((void *)rq, rq->client->last_io); return antd_empty_task((void *)rq, rq->client->last_io);
} }
// read the setting frame // read the setting frame
@ -230,8 +371,8 @@ void* antd_h2_preface_ck(void* data)
// TODO: frame error // TODO: frame error
// //
// send go away with PROTOCOL_ERROR // send go away with PROTOCOL_ERROR
printf("error reading setting frame\n"); ERROR("Unable to read setting frame from client %d",rq->client->id);
ERROR("Unable to read setting frame from client %d",rq->client->sock); antd_h2_error(data,0, H2_PROTOCOL_ERROR);
return antd_empty_task((void *)rq, rq->client->last_io); return antd_empty_task((void *)rq, rq->client->last_io);
} }
// create a connection // create a connection
@ -248,18 +389,15 @@ void* antd_h2_preface_ck(void* data)
header.identifier = 0; header.identifier = 0;
if(antd_h2_send_frame(rq, &header,NULL)) if(antd_h2_send_frame(rq, &header,NULL))
{ {
printf("frame sent\n");
return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io);
} }
else else
{ {
printf("cannot send frame\n");
return antd_empty_task(data, rq->client->last_io); return antd_empty_task(data, rq->client->last_io);
} }
} }
else else
{ {
printf("Error process frame %d\n", stat);
return antd_empty_task(data, rq->client->last_io); return antd_empty_task(data, rq->client->last_io);
} }
} }
@ -267,14 +405,20 @@ void* antd_h2_preface_ck(void* data)
void* antd_h2_handle(void* data) void* antd_h2_handle(void* data)
{ {
antd_request_t* rq = (antd_request_t*) data; antd_request_t* rq = (antd_request_t*) data;
antd_task_t* task; antd_task_t* task = NULL;
if(rq->client->flags & CLIENT_FL_READABLE) if(rq->client->flags & CLIENT_FL_READABLE)
{ {
antd_h2_read(data); if(!antd_h2_read(data))
{
return antd_empty_task(data, rq->client->last_io);
}
} }
if(rq->client->flags & CLIENT_FL_WRITABLE) if(rq->client->flags & CLIENT_FL_WRITABLE)
{ {
antd_h2_write(data); if(!antd_h2_write(data))
{
return antd_empty_task(data, rq->client->last_io);
}
} }
task = antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); task = antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io);
@ -285,30 +429,34 @@ void* antd_h2_handle(void* data)
void* antd_h2_read(void* data) int antd_h2_read(void* data)
{ {
antd_h2_frame_header_t frame_h; antd_h2_frame_header_t frame_h;
antd_request_t* rq = (antd_request_t*) data; antd_request_t* rq = (antd_request_t*) data;
if(!antd_h2_read_frame_header(rq->client, &frame_h)) if(!antd_h2_read_frame_header(rq->client, &frame_h))
{ {
// TODO: frame error
// send goaway frame // send goaway frame
ERROR("Unable to read frame from client %d",rq->client->sock); ERROR("Unable to read frame from client %d",rq->client->id);
return antd_empty_task(data, rq->client->last_io); antd_h2_error(data, 0, H2_PROTOCOL_ERROR);
return 0;
} }
process_frame(data, &frame_h); int stat = process_frame(data, &frame_h);
return antd_empty_task(data, rq->client->last_io); if(stat == H2_NO_ERROR || stat == H2_IGNORED)
return 1;
return 0;
} }
void* antd_h2_write(void* data) int antd_h2_write(void* data)
{ {
antd_request_t* rq = (antd_request_t*) data; UNUSED(data);
//antd_request_t* rq = (antd_request_t*) data;
//printf("write task\n"); //printf("write task\n");
return antd_empty_task(data, rq->client->last_io); return 1;
} }
antd_h2_conn_t* antd_h2_open_conn() antd_h2_conn_t* antd_h2_open_conn()
{ {
antd_h2_conn_t* conn = (antd_h2_conn_t*) malloc(sizeof(conn)); antd_h2_conn_t* conn = (antd_h2_conn_t*) malloc(sizeof(antd_h2_conn_t));
if(! conn) if(! conn)
return NULL; return NULL;
@ -321,7 +469,11 @@ antd_h2_conn_t* antd_h2_open_conn()
conn->win_sz = conn->settings.init_win_sz; conn->win_sz = conn->settings.init_win_sz;
conn->last_stream_id = 0; conn->last_stream_id = 0;
conn->streams = (antd_h2_stream_list_t*) malloc(2*sizeof(antd_h2_stream_list_t)); conn->streams = (antd_h2_stream_list_t*) malloc(2*sizeof(antd_h2_stream_list_t));
if(conn->streams)
{
conn->streams[0] = NULL;
conn->streams[1] = NULL;
}
return conn; return conn;
} }
@ -333,8 +485,8 @@ void antd_h2_close_conn(antd_h2_conn_t* conn)
if(conn->streams) if(conn->streams)
{ {
antd_h2_close_all_streams(conn->streams[0]); antd_h2_close_all_streams(&conn->streams[0]);
antd_h2_close_all_streams(conn->streams[1]); antd_h2_close_all_streams(&conn->streams[1]);
free(conn->streams); free(conn->streams);
} }
free(conn); free(conn);
@ -398,19 +550,31 @@ antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t* streams, int id)
void antd_h2_close_stream(antd_h2_stream_t* stream) void antd_h2_close_stream(antd_h2_stream_t* stream)
{ {
if(!stream) return; if(!stream) return;
if(stream->stdin) free(stream->stdin); // TODO empty the queue
if(stream->stdout) free(stream->stdout); if(stream->stdin)
free(stream); {
queue_empty(stream->stdin, (void (*)(void*))antd_h2_destroy_frame);
free(stream->stdin);
}
if(stream->stdout)
{
queue_empty(stream->stdout, (void (*)(void*))antd_h2_destroy_frame);
free(stream->stdout);
}
//free(stream);
} }
void antd_h2_close_all_streams(antd_h2_stream_list_t streams) void antd_h2_close_all_streams(antd_h2_stream_list_t* streams)
{ {
antd_h2_stream_list_t it; antd_h2_stream_list_t it;
for(it = streams; it != NULL; it = it->next) while((*streams) != NULL)
{ {
it = *streams;
(*streams) = (*streams)->next;
if(it->stream) if(it->stream)
{ {
antd_h2_close_stream(it->stream); antd_h2_close_stream(it->stream);
free(it->stream);
} }
free(it); free(it);
} }
@ -436,6 +600,7 @@ void antd_h2_del_stream(antd_h2_stream_list_t* streams, int id)
it = streams[idx]; it = streams[idx];
streams[idx] = it->next; streams[idx] = it->next;
antd_h2_close_stream(it->stream); antd_h2_close_stream(it->stream);
free(it->stream);
free(it); free(it);
return; return;
} }
@ -448,6 +613,7 @@ void antd_h2_del_stream(antd_h2_stream_list_t* streams, int id)
it->next = np->next; it->next = np->next;
np->next = NULL; np->next = NULL;
antd_h2_close_stream(np->stream); antd_h2_close_stream(np->stream);
free(np->stream);
free(np); free(np);
return; return;
} }

View File

@ -117,6 +117,17 @@ field value other than 0 MUST be treated as a connection error
#define H2_SETTINGS_MAX_HEADER_LIST_SIZE 0x6 #define H2_SETTINGS_MAX_HEADER_LIST_SIZE 0x6
// stream state
typedef enum {
H2_STR_IDLE,
H2_STR_OPEN,
H2_STR_REV_LOC,
H2_STR_REV_REM,
H2_STR_HALF_CLOSED_LOC,
H2_STR_HALF_CLOSED_REM,
H2_STR_CLOSED
} antd_h2_stream_state_t;
typedef struct{ typedef struct{
uint32_t header_table_sz; uint32_t header_table_sz;
uint32_t enable_push; uint32_t enable_push;
@ -150,6 +161,10 @@ typedef struct {
struct queue_root* stdin; struct queue_root* stdin;
struct queue_root* stdout; struct queue_root* stdout;
int win_sz; int win_sz;
antd_h2_stream_state_t state;
//uint8_t flags;
int dependency;
uint8_t weight;
int id; int id;
} antd_h2_stream_t; } antd_h2_stream_t;
@ -167,20 +182,34 @@ typedef struct {
unsigned int identifier; unsigned int identifier;
} antd_h2_frame_header_t; } antd_h2_frame_header_t;
typedef struct {
antd_h2_frame_header_t header;
uint8_t* pageload;
} antd_h2_frame_t;
/*Frame utilities functions*/
void antd_h2_destroy_frame(antd_h2_frame_t*);
/*stream utilities functions*/ /*stream utilities functions*/
antd_h2_stream_t* antd_h2_init_stream(int id, int wsz);
void antd_h2_close_stream(antd_h2_stream_t* stream); void antd_h2_close_stream(antd_h2_stream_t* stream);
void antd_h2_add_stream(antd_h2_stream_list_t*, antd_h2_stream_t*); void antd_h2_add_stream(antd_h2_stream_list_t*, antd_h2_stream_t*);
antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t*, int); antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t*, int);
void antd_h2_del_stream(antd_h2_stream_list_t*, int); void antd_h2_del_stream(antd_h2_stream_list_t*, int);
void antd_h2_close_all_streams(antd_h2_stream_list_t); void antd_h2_close_all_streams(antd_h2_stream_list_t*);
void antd_h2_update_streams_win_sz(antd_h2_stream_list_t streams, int offset); void antd_h2_update_streams_win_sz(antd_h2_stream_list_t streams, int offset);
void h2_stream_io_put(antd_h2_stream_t*, antd_h2_frame_t*);
antd_h2_frame_t* h2_streamio_get(struct queue_root*);
antd_request_t* antd_h2_request_init(antd_request_t*, antd_h2_stream_t*);
/*Connection utilities funtions*/ /*Connection utilities funtions*/
antd_h2_conn_t* antd_h2_open_conn(); antd_h2_conn_t* antd_h2_open_conn();
void antd_h2_close_conn(antd_h2_conn_t*); void antd_h2_close_conn(antd_h2_conn_t*);
void* antd_h2_read(void* rq); int antd_h2_read(void* rq);
void* antd_h2_write(void* rq); int antd_h2_write(void* rq);
void* antd_h2_preface_ck(void* rq); void* antd_h2_preface_ck(void* rq);
void* antd_h2_handle(void* rq); void* antd_h2_handle(void* rq);
int antd_h2_send_frame(antd_request_t*, antd_h2_frame_header_t*, uint8_t*); int antd_h2_send_frame(antd_request_t*, antd_h2_frame_header_t*, uint8_t*);

View File

@ -104,6 +104,11 @@ void plugindir(char* dest)
UNUSED(dest); UNUSED(dest);
} }
void antd_schedule_task(antd_task_t* task)
{
UNUSED(task);
}
const char* get_status_str(int stat) const char* get_status_str(int stat)
{ {
switch(stat) switch(stat)
@ -326,7 +331,7 @@ int antd_send(void *src, const void* data_in, int len_in)
int count; int count;
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
if(source->ssl) if(source->stream)
{ {
//LOG("SSL WRITE\n"); //LOG("SSL WRITE\n");
//ret = SSL_write((SSL*) source->ssl, data, len); //ret = SSL_write((SSL*) source->ssl, data, len);
@ -339,8 +344,8 @@ int antd_send(void *src, const void* data_in, int len_in)
{ {
// clear the error queue // clear the error queue
ERR_clear_error(); ERR_clear_error();
count = SSL_write (source->ssl, ptr+written, writelen); count = SSL_write (source->stream, ptr+written, writelen);
int err = SSL_get_error(source->ssl, count); int err = SSL_get_error(source->stream, count);
if (count > 0) if (count > 0)
{ {
written += count; written += count;
@ -378,7 +383,7 @@ int antd_send(void *src, const void* data_in, int len_in)
// no data available right now, wait a few seconds in case new data arrives... // no data available right now, wait a few seconds in case new data arrives...
//printf("SSL_ERROR_WANT_READ\n"); //printf("SSL_ERROR_WANT_READ\n");
int sock = SSL_get_rfd(source->ssl); int sock = SSL_get_rfd(source->stream);
FD_ZERO(&fds); FD_ZERO(&fds);
FD_SET(sock, &fds); FD_SET(sock, &fds);
@ -398,7 +403,7 @@ int antd_send(void *src, const void* data_in, int len_in)
{ {
// socket not writable right now, wait a few seconds and try again... // socket not writable right now, wait a few seconds and try again...
//printf("SSL_ERROR_WANT_WRITE \n"); //printf("SSL_ERROR_WANT_WRITE \n");
int sock = SSL_get_wfd(source->ssl); int sock = SSL_get_wfd(source->stream);
FD_ZERO(&fds); FD_ZERO(&fds);
FD_SET(sock, &fds); FD_SET(sock, &fds);
@ -437,7 +442,7 @@ int antd_send(void *src, const void* data_in, int len_in)
written = 0; written = 0;
while (writelen > 0) while (writelen > 0)
{ {
count = send(source->sock, ptr+written, writelen, 0); count = send(source->id, ptr+written, writelen, 0);
if (count > 0) if (count > 0)
{ {
written += count; written += count;
@ -468,7 +473,7 @@ int antd_recv(void *src, void* data, int len)
int readlen=0; int readlen=0;
antd_client_t * source = (antd_client_t *) src; antd_client_t * source = (antd_client_t *) src;
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
if(source->ssl) if(source->stream)
{ {
ptr = (char* )data; ptr = (char* )data;
readlen = len > BUFFLEN?BUFFLEN:len; readlen = len > BUFFLEN?BUFFLEN:len;
@ -478,8 +483,8 @@ int antd_recv(void *src, void* data, int len)
while (readlen > 0 )//&& source->attempt < MAX_ATTEMPT while (readlen > 0 )//&& source->attempt < MAX_ATTEMPT
{ {
ERR_clear_error(); ERR_clear_error();
received = SSL_read (source->ssl, ptr+read, readlen); received = SSL_read (source->stream, ptr+read, readlen);
int err = SSL_get_error(source->ssl, received); int err = SSL_get_error(source->stream, received);
if (received > 0) if (received > 0)
{ {
read += received; read += received;
@ -518,7 +523,7 @@ int antd_recv(void *src, void* data, int len)
// no data available right now, wait a few seconds in case new data arrives... // no data available right now, wait a few seconds in case new data arrives...
//printf("SSL_ERROR_WANT_READ\n"); //printf("SSL_ERROR_WANT_READ\n");
int sock = SSL_get_rfd(source->ssl); int sock = SSL_get_rfd(source->stream);
FD_ZERO(&fds); FD_ZERO(&fds);
FD_SET(sock, &fds); FD_SET(sock, &fds);
@ -538,7 +543,7 @@ int antd_recv(void *src, void* data, int len)
{ {
// socket not writable right now, wait a few seconds and try again... // socket not writable right now, wait a few seconds and try again...
//printf("SSL_ERROR_WANT_WRITE \n"); //printf("SSL_ERROR_WANT_WRITE \n");
int sock = SSL_get_wfd(source->ssl); int sock = SSL_get_wfd(source->stream);
FD_ZERO(&fds); FD_ZERO(&fds);
FD_SET(sock, &fds); FD_SET(sock, &fds);
@ -595,7 +600,7 @@ int antd_recv(void *src, void* data, int len)
read = 0; read = 0;
while (readlen > 0 ) while (readlen > 0 )
{ {
received = recv(((int) source->sock), ptr+read, readlen, 0); received = recv(((int) source->id), ptr+read, readlen, 0);
//LOG("Read : %c\n", *ptr); //LOG("Read : %c\n", *ptr);
if (received > 0) if (received > 0)
{ {
@ -656,24 +661,24 @@ int antd_close(void* src)
} }
#endif #endif
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
if(source->ssl){ if(source->stream){
//printf("SSL:Shutdown ssl\n"); //printf("SSL:Shutdown ssl\n");
//SSL_shutdown((SSL*) source->ssl); //SSL_shutdown((SSL*) source->ssl);
SSL_set_shutdown((SSL*) source->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN); SSL_set_shutdown((SSL*) source->stream, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN);
//printf("SSL:Free ssl\n"); //printf("SSL:Free ssl\n");
SSL_free((SSL*) source->ssl); SSL_free((SSL*) source->stream);
//EVP_cleanup(); //EVP_cleanup();
//ENGINE_cleanup(); //ENGINE_cleanup();
CRYPTO_cleanup_all_ex_data(); CRYPTO_cleanup_all_ex_data();
ERR_remove_state(0); ERR_remove_state(0);
ERR_free_strings(); ERR_free_strings();
source->ssl = NULL; source->stream = NULL;
//LOG("Freeing SSL\n"); //LOG("Freeing SSL\n");
} }
#endif #endif
//printf("Close sock %d\n", source->sock); //printf("Close sock %d\n", source->sock);
int ret = close(source->sock); int ret = close(source->id);
free(src); free(src);
src = NULL; src = NULL;
return ret; return ret;

View File

@ -44,6 +44,7 @@ typedef enum {ANTD_CGZ, ANTD_CDEFL, ANTD_CNONE} antd_compress_t;
#define CLIENT_FL_HTTP_1_1 0x04 #define CLIENT_FL_HTTP_1_1 0x04
#define CLIENT_FL_READABLE 0x08 #define CLIENT_FL_READABLE 0x08
#define CLIENT_FL_WRITABLE 0x10 #define CLIENT_FL_WRITABLE 0x10
#define CLIENT_FL_H2_STREAM 0x20
typedef struct { typedef struct {
unsigned int port; unsigned int port;
@ -54,8 +55,8 @@ typedef struct {
} port_config_t; } port_config_t;
typedef struct{ typedef struct{
int sock; int id;
void* ssl; void* stream;
uint8_t flags; uint8_t flags;
time_t last_io; time_t last_io;
// compress option // compress option
@ -122,6 +123,7 @@ void __attribute__((weak)) dbdir(char* dest);
void __attribute__((weak)) tmpdir(char* dest); void __attribute__((weak)) tmpdir(char* dest);
void __attribute__((weak)) plugindir(char* dest); void __attribute__((weak)) plugindir(char* dest);
int __attribute__((weak)) compressable(char* ctype); int __attribute__((weak)) compressable(char* ctype);
void __attribute__((weak)) antd_schedule_task(antd_task_t*);
void set_nonblock(int socket); void set_nonblock(int socket);
//void set_block(int socket); //void set_block(int socket);

View File

@ -50,6 +50,24 @@ void queue_put(struct queue_head *new,
} }
} }
int queue_readable(struct queue_root *root)
{
return root->out_queue != NULL && root->out_queue != QUEUE_POISON1;
}
void queue_empty(struct queue_root * root, void (* fn)(void*))
{
struct queue_head* head = NULL;
while((head = queue_get(root)) != NULL)
{
if(fn)
{
fn(head->data);
}
free(head);
}
}
struct queue_head *queue_get(struct queue_root *root) struct queue_head *queue_get(struct queue_root *root)
{ {
// pthread_spin_lock(&root->lock); // pthread_spin_lock(&root->lock);

View File

@ -14,7 +14,8 @@ struct queue_root *ALLOC_QUEUE_ROOT();
void INIT_QUEUE_HEAD(struct queue_head *head); void INIT_QUEUE_HEAD(struct queue_head *head);
void queue_put(struct queue_head *,struct queue_root *); void queue_put(struct queue_head *,struct queue_root *);
int queue_readable(struct queue_root *);
void queue_empty(struct queue_root *, void (*)(void*) );
struct queue_head *queue_get(struct queue_root *root); struct queue_head *queue_get(struct queue_root *root);
#endif // QUEUE_H #endif // QUEUE_H

View File

@ -402,7 +402,7 @@ int ws_client_connect(ws_client_t* wsclient, port_config_t pcnf)
return -1; return -1;
} }
// will be free // will be free
wsclient->antdsock->sock = sock; wsclient->antdsock->id = sock;
wsclient->antdsock->flags = 0; wsclient->antdsock->flags = 0;
wsclient->antdsock->last_io = time(NULL); wsclient->antdsock->last_io = time(NULL);
wsclient->antdsock->zstream = NULL; wsclient->antdsock->zstream = NULL;
@ -486,19 +486,19 @@ int ws_client_connect(ws_client_t* wsclient, port_config_t pcnf)
SSL_CTX_set_verify(wsclient->ssl_ctx, SSL_VERIFY_NONE, NULL); SSL_CTX_set_verify(wsclient->ssl_ctx, SSL_VERIFY_NONE, NULL);
} }
wsclient->antdsock->ssl = (void*)SSL_new(wsclient->ssl_ctx); wsclient->antdsock->stream = (void*)SSL_new(wsclient->ssl_ctx);
if(!wsclient->antdsock->ssl) if(!wsclient->antdsock->stream)
{ {
ssl_err = ERR_get_error(); ssl_err = ERR_get_error();
ERROR("SSL_new: %s", ERR_error_string(ssl_err, NULL)); ERROR("SSL_new: %s", ERR_error_string(ssl_err, NULL));
return -1; return -1;
} }
SSL_set_fd((SSL*)wsclient->antdsock->ssl, wsclient->antdsock->sock); SSL_set_fd((SSL*)wsclient->antdsock->stream, wsclient->antdsock->id);
int stat, ret; int stat, ret;
ERR_clear_error(); ERR_clear_error();
while( (ret = SSL_connect(wsclient->antdsock->ssl)) <= 0) while( (ret = SSL_connect(wsclient->antdsock->stream)) <= 0)
{ {
stat = SSL_get_error(wsclient->antdsock->ssl, ret); stat = SSL_get_error(wsclient->antdsock->stream, ret);
switch (stat) switch (stat)
{ {
case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ: