This commit is contained in:
lxsang 2020-01-28 18:02:09 +01:00
parent a86a2d150e
commit 894f6d0f31
11 changed files with 562 additions and 72 deletions

View File

@ -21,6 +21,7 @@ libantd_la_SOURCES = lib/ini.c \
lib/ws.c \ lib/ws.c \
lib/sha1.c \ lib/sha1.c \
lib/list.c \ lib/list.c \
lib/queue.c \
lib/scheduler.c \ lib/scheduler.c \
lib/h2.c lib/h2.c
@ -34,7 +35,8 @@ pkginclude_HEADERS = lib/ini.h \
lib/list.h \ lib/list.h \
lib/scheduler.h \ lib/scheduler.h \
lib/plugin.h \ lib/plugin.h \
lib/h2.h lib/h2.h \
lib/queue.h
EXTRA_DIST = plugin_manager.h http_server.h README.md LICENSE antd-config.ini EXTRA_DIST = plugin_manager.h http_server.h README.md LICENSE antd-config.ini

View File

@ -392,17 +392,15 @@ void *accept_request(void *data)
return task; return task;
} }
} }
#endif #if OPENSSL_VERSION_NUMBER >= 0x10002000L
//printf("Flag: %d\n", client->flags); if(!(client->flags & CLIENT_FL_HTTP_1_1))
// now return the task base on the http version
if(client->flags & CLIENT_FL_HTTP_1_1)
{
task->handle = decode_request_header;
}
else
{ {
task->handle = antd_h2_preface_ck; task->handle = antd_h2_preface_ck;
return task;
} }
#endif
#endif
task->handle = decode_request_header;
return task; return task;
} }
@ -1399,3 +1397,41 @@ int compressable(char* ctype)
return 0; return 0;
} }
#endif #endif
void destroy_request(void *data)
{
if (!data)
return;
antd_request_t *rq = (antd_request_t *)data;
//LOG("Close request %d", rq->client->sock);
// free all other thing
if (rq->request)
{
dictionary_t tmp = dvalue(rq->request, "COOKIE");
if (tmp)
freedict(tmp);
tmp = dvalue(rq->request, "REQUEST_HEADER");
if (tmp)
freedict(tmp);
tmp = dvalue(rq->request, "REQUEST_DATA");
if (tmp)
freedict(tmp);
dput(rq->request, "REQUEST_HEADER", NULL);
dput(rq->request, "REQUEST_DATA", NULL);
dput(rq->request, "COOKIE", NULL);
#ifdef USE_OPENSSL
#if OPENSSL_VERSION_NUMBER >= 0x10002000L
antd_h2_conn_t* conn = H2_CONN(data);
if(conn)
{
//H2_CONNECTION
antd_h2_close_conn(conn);
dput(rq->request, "H2_CONNECTION", NULL);
}
#endif
#endif
freedict(rq->request);
}
antd_close(rq->client);
free(rq);
}

View File

@ -40,5 +40,6 @@ void* decode_multi_part_request_data(void* data);
void decode_cookie(const char*, dictionary_t d); void decode_cookie(const char*, dictionary_t d);
char* post_data_decode(void*,int); char* post_data_decode(void*,int);
void* execute_plugin(void* data, const char *path); void* execute_plugin(void* data, const char *path);
void destroy_request(void *data);
#endif #endif

View File

@ -284,7 +284,7 @@ int main(int argc, char* argv[])
pthread_t scheduler_th; pthread_t scheduler_th;
if (pthread_create(&scheduler_th, NULL,(void *(*)(void *))antd_scheduler_wait, (void*)&scheduler) != 0) if (pthread_create(&scheduler_th, NULL,(void *(*)(void *))antd_scheduler_wait, (void*)&scheduler) != 0)
{ {
ERROR("pthread_create: cannot create worker"); ERROR("pthread_create: cannot create scheduler thread");
stop_serve(0); stop_serve(0);
exit(1); exit(1);
} }

View File

@ -118,8 +118,6 @@ chain_t insert(dictionary_t dic,const char* key, void* value, antd_dict_item_typ
return np; return np;
} }
chain_t dremove(dictionary_t dic, const char* key) chain_t dremove(dictionary_t dic, const char* key)
{ {
if(dic->map == NULL) return 0; if(dic->map == NULL) return 0;

367
lib/h2.c
View File

@ -1,6 +1,13 @@
#include <netinet/in.h>
#include "h2.h" #include "h2.h"
#include "scheduler.h" #include "scheduler.h"
struct antd_h2_stream_list_t{
struct antd_h2_stream_list_t* next;
antd_h2_stream_t* stream;
};
static int antd_h2_read_frame_header(antd_client_t* cl, antd_h2_frame_header_t* frame) static int antd_h2_read_frame_header(antd_client_t* cl, antd_h2_frame_header_t* frame)
{ {
frame->length = 0; frame->length = 0;
@ -11,43 +18,191 @@ static int antd_h2_read_frame_header(antd_client_t* cl, antd_h2_frame_header_t*
if( antd_recv(cl,& header,sizeof(header)) != sizeof(header)) return 0; if( antd_recv(cl,& header,sizeof(header)) != sizeof(header)) return 0;
// network byte order is big endian // network byte order is big endian
// read frame length // read frame length
frame->length = (*header << 16) + (*(header + 1)<< 8) + *(header+2); memcpy( ((uint8_t*)(&frame->length)) + 1,header,3);
frame->length = ntohl(frame->length);
// frame type // frame type
frame->type = *(header + 3); frame->type = *(header + 3);
// frame flags // frame flags
frame->flags = *(header + 4); frame->flags = *(header + 4);
// frame identifier memcpy(&frame->identifier,header+5,4);
frame->identifier = ((*(header + 5) & 0x7F) << 24) + (*(header + 6)<< 16) + (*(header + 7)<< 8) + *(header + 8); frame->identifier = ntohl(frame->identifier) & 0x7FFFFFFF;
return 1; return 1;
} }
static int process_setting_frame(antd_request_t* rq, antd_h2_frame_header_t* frame_h)
static int process_frame(void* source, antd_h2_frame_header_t* frame_h)
{ {
// verify frame if(frame_h->length == 0)
printf("Frame type: %d\n", frame_h->type & 0xff); return H2_NO_ERROR;
printf("Frame flag: %d\n",frame_h->flags); if(frame_h->flags & H2_SETTING_ACK_FLG)
printf("frame identifier: %d\n", frame_h->identifier); {
return H2_FRAME_SIZE_ERROR;
}
if(frame_h->identifier != 0)
{
return H2_PROTOCOL_ERROR;
}
if(frame_h->length % 6 != 0)
{
return H2_FRAME_SIZE_ERROR;
}
uint8_t* frame_data = (uint8_t*)malloc(frame_h->length); uint8_t* frame_data = (uint8_t*)malloc(frame_h->length);
if(!frame_data) if(!frame_data)
{ {
return 0; return H2_INTERNAL_ERROR;
} }
antd_request_t* rq = (antd_request_t*) source; antd_h2_conn_t* conn = (antd_h2_conn_t*) dvalue(rq->request,"H2_CONNECTION");
if(antd_recv(rq->client,frame_data,frame_h->length) != frame_h->length) if(!conn)
{
return H2_INTERNAL_ERROR;
}
if(antd_recv(rq->client,frame_data,frame_h->length) != (int)frame_h->length)
{ {
// TODO error
// go away
ERROR("Cannot read all frame data"); ERROR("Cannot read all frame data");
free(frame_data); free(frame_data);
return H2_NO_ERROR; return H2_PROTOCOL_ERROR;
} }
// read each identifier
uint16_t param_id;
int param_val;
uint8_t* ptr;
for (size_t i = 0; i < frame_h->length / 6 ; i++)
{
ptr = frame_data + i*6;
memcpy(&param_id,ptr,2);
param_id = ntohs(param_id);
ptr += 2;
memcpy(&param_val,ptr,4);
param_val = ntohl(param_val);
printf("id: %d val: %d\n", param_id, param_val);
switch (param_id)
{
case H2_SETTINGS_HEADER_TABLE_SIZE:
conn->settings.header_table_sz = param_val;
break;
case H2_SETTINGS_ENABLE_PUSH:
if(param_val != 0 || param_val != 1)
{
free(frame_data); free(frame_data);
return H2_PROTOCOL_ERROR;
}
conn->settings.enable_push = param_val;
break;
case H2_SETTINGS_MAX_CONCURRENT_STREAMS:
conn->settings.max_concurrent_streams = param_val;
break;
case H2_SETTINGS_INITIAL_WINDOW_SIZE:
if(param_val > 2147483647)// 2^31-1
{
free(frame_data);
return H2_FLOW_CONTROL_ERROR;
}
int offset = param_val - conn->settings.init_win_sz;
conn->settings.init_win_sz = param_val;
// this should applied only to streams with active flow-control windows
antd_h2_update_streams_win_sz(conn->streams[0], offset);
/*
In addition to changing the flow-control window for streams that are
not yet active, a SETTINGS frame can alter the initial flow-control
window size for streams with active flow-control windows (that is,
streams in the "open" or "half-closed (remote)" state). When the
value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
the size of all stream flow-control windows that it maintains by the
difference between the new value and the old value.
*/
break;
case H2_SETTINGS_MAX_FRAME_SIZE:
if(param_val < 16384 || param_val > 16777215) // < 2^14 or > 2^24-1
{
free(frame_data);
return H2_PROTOCOL_ERROR;
}
conn->settings.max_frame_sz = param_val;
break;
case H2_SETTINGS_MAX_HEADER_LIST_SIZE:
conn->settings.max_header_list_sz = param_val;
break;
default:
free(frame_data);
return H2_IGNORED;
}
}
free(frame_data);
// send back ack setting frame
return H2_NO_ERROR; return H2_NO_ERROR;
} }
static int process_window_update_frame(antd_request_t* rq, antd_h2_frame_header_t* frame_h)
{
int window_size_incr;
if(antd_recv(rq->client,&window_size_incr, 4) != 4)
{
return H2_PROTOCOL_ERROR;
}
window_size_incr = ntohl(window_size_incr) & 0x7FFFFFFF;
if(window_size_incr <1 || window_size_incr > 2147483647)
return H2_PROTOCOL_ERROR;
antd_h2_conn_t* conn = H2_CONN(rq);
if(!conn)
return H2_INTERNAL_ERROR;
if(frame_h->identifier == 0)
{
conn->win_sz += window_size_incr;
}
else
{
antd_h2_stream_t* stream = antd_h2_get_stream(conn->streams,frame_h->identifier);
if(!stream)
return H2_INTERNAL_ERROR;
stream->win_sz += window_size_incr;
}
return H2_NO_ERROR;
}
static int process_frame(void* source, antd_h2_frame_header_t* frame_h)
{
int stat;
switch (frame_h->type)
{
case H2_FRM_SETTINGS:
stat = process_setting_frame(source, frame_h);
break;
case H2_FRM_WINDOW_UPDATE:
stat = process_window_update_frame(source,frame_h);
break;
default:
printf("Frame: %d, length: %d id: %d\n", frame_h->type, frame_h->length, frame_h->identifier);
stat = H2_IGNORED;
}
if(stat == H2_NO_ERROR || stat == H2_IGNORED)
return stat;
antd_h2_conn_t* conn = H2_CONN(source);
if(conn)
{
// send error frame
antd_h2_frame_header_t header;
header.identifier = frame_h->identifier;
//header.type = stat;
header.flags = 0;
header.length = 8;
uint8_t error_body[8];
int tmp = htonl(conn->last_stream_id);
memcpy(error_body, &tmp , 4 );
tmp = htonl(stat);
memcpy(error_body + 4, &tmp ,4);
header.type = H2_FRM_RST_STREAM;
if(frame_h->identifier == 0)
header.type = H2_FRM_GOAWAY;
antd_h2_send_frame(source,&header,error_body);
}
return stat;
}
void* antd_h2_preface_ck(void* data) void* antd_h2_preface_ck(void* data)
{ {
char buf[25]; char buf[25];
@ -79,8 +234,34 @@ void* antd_h2_preface_ck(void* data)
ERROR("Unable to read setting frame from client %d",rq->client->sock); ERROR("Unable to read setting frame from client %d",rq->client->sock);
return antd_empty_task((void *)rq, rq->client->last_io); return antd_empty_task((void *)rq, rq->client->last_io);
} }
process_frame(rq, &frame_h); // create a connection
dput(rq->request,"H2_CONNECTION",antd_h2_open_conn());
int stat = process_frame(rq, &frame_h);
if( stat == H2_NO_ERROR || stat == H2_IGNORED)
{
//TODO: send back setting frame
// and init the conn
antd_h2_frame_header_t header;
header.length = 0;
header.type = H2_FRM_SETTINGS;
header.flags = 0x1;
header.identifier = 0;
if(antd_h2_send_frame(rq, &header,NULL))
{
printf("frame sent\n");
return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io); return antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io);
}
else
{
printf("cannot send frame\n");
return antd_empty_task(data, rq->client->last_io);
}
}
else
{
printf("Error process frame %d\n", stat);
return antd_empty_task(data, rq->client->last_io);
}
} }
void* antd_h2_handle(void* data) void* antd_h2_handle(void* data)
@ -96,7 +277,7 @@ void* antd_h2_handle(void* data)
antd_h2_write(data); antd_h2_write(data);
} }
task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); task = antd_create_task(antd_h2_handle, (void *)rq, NULL, rq->client->last_io);
task->priority++; task->priority++;
return task; return task;
} }
@ -121,6 +302,154 @@ void* antd_h2_read(void* data)
void* antd_h2_write(void* data) void* antd_h2_write(void* data)
{ {
antd_request_t* rq = (antd_request_t*) data; antd_request_t* rq = (antd_request_t*) data;
printf("write task\n"); //printf("write task\n");
return antd_empty_task(data, rq->client->last_io); return antd_empty_task(data, rq->client->last_io);
} }
antd_h2_conn_t* antd_h2_open_conn()
{
antd_h2_conn_t* conn = (antd_h2_conn_t*) malloc(sizeof(conn));
if(! conn)
return NULL;
conn->settings.header_table_sz = 4096;
conn->settings.enable_push = 1;
conn->settings.max_concurrent_streams = 100;
conn->settings.init_win_sz = 65535;
conn->settings.max_frame_sz = 16384;
conn->settings.max_header_list_sz = 0; //unlimited
conn->win_sz = conn->settings.init_win_sz;
conn->last_stream_id = 0;
conn->streams = (antd_h2_stream_list_t*) malloc(2*sizeof(antd_h2_stream_list_t));
return conn;
}
void antd_h2_close_conn(antd_h2_conn_t* conn)
{
if(! conn)
return;
if(conn->streams)
{
antd_h2_close_all_streams(conn->streams[0]);
antd_h2_close_all_streams(conn->streams[1]);
free(conn->streams);
}
free(conn);
}
int antd_h2_send_frame(antd_request_t* rq, antd_h2_frame_header_t* fr_header, uint8_t* data)
{
// send the frame header in network bytes order
uint8_t header[9];
int nbo_int = htonl(fr_header->length) >> 8;
memcpy(header,&nbo_int,3);
// type
*(header+3) = fr_header->type;
// flag
*(header+4) = fr_header->flags;
// identifier
nbo_int = htonl(fr_header->identifier);
memcpy(header+5, &nbo_int,4);
if(antd_send(rq->client,header,sizeof(header)) != sizeof(header))
{
return 0;
}
if(fr_header->length == 0)
{
return 1;
}
if(data == NULL)
{
return 0;
}
// send data
if(antd_send(rq->client,data,fr_header->length) !=(int)fr_header->length)
{
return 0;
}
return 1;
}
/*stream utilities functions*/
void antd_h2_add_stream(antd_h2_stream_list_t* streams, antd_h2_stream_t* stream)
{
if(!stream || !streams) return;
int idx = stream->id % 2;
antd_h2_stream_list_t it = (antd_h2_stream_list_t) malloc(sizeof(it));
it->next = streams[idx];
it->stream = stream;
streams[idx] = it;
}
antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t* streams, int id)
{
int idx = id % 2;
antd_h2_stream_list_t it;
for(it = streams[idx]; it != NULL; it = it->next)
{
if(id == it->stream->id)
return it->stream;
}
return NULL;
}
void antd_h2_close_stream(antd_h2_stream_t* stream)
{
if(!stream) return;
if(stream->stdin) free(stream->stdin);
if(stream->stdout) free(stream->stdout);
free(stream);
}
void antd_h2_close_all_streams(antd_h2_stream_list_t streams)
{
antd_h2_stream_list_t it;
for(it = streams; it != NULL; it = it->next)
{
if(it->stream)
{
antd_h2_close_stream(it->stream);
}
free(it);
}
}
void antd_h2_update_streams_win_sz(antd_h2_stream_list_t streams, int offset)
{
antd_h2_stream_list_t it;
for(it = streams; it != NULL; it = it->next)
{
if(it->stream)
{
it->stream->win_sz += offset;
}
}
}
void antd_h2_del_stream(antd_h2_stream_list_t* streams, int id)
{
int idx = id % 2;
antd_h2_stream_list_t it;
if(streams[idx] && streams[idx]->stream->id == id)
{
it = streams[idx];
streams[idx] = it->next;
antd_h2_close_stream(it->stream);
free(it);
return;
}
for(it = streams[idx]; it != NULL; it = it->next)
{
if(it->next!= NULL && id == it->next->stream->id)
{
antd_h2_stream_list_t np;
np = it->next;
it->next = np->next;
np->next = NULL;
antd_h2_close_stream(np->stream);
free(np);
return;
}
}
}

View File

@ -1,7 +1,9 @@
#ifndef HTTP2_H #ifndef HTTP2_H
#define HTTP2_H #define HTTP2_H
#include "handle.h" #include "handle.h"
#include "hpack.h" #include "hpack.h"
#include "queue.h"
#define H2_CONN_PREFACE "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define H2_CONN_PREFACE "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
@ -90,21 +92,65 @@ instead of HTTP/2.
*/ */
#define H2_HTTP_1_1_REQUIRED 0xd #define H2_HTTP_1_1_REQUIRED 0xd
/*
The frame should be ignore
*/
#define H2_IGNORED 0xe
/*
SETTING FRAME CONSTs
*/
/*
When set, bit 0 indicates that this frame acknowledges
receipt and application of the peer's SETTINGS frame. When this
bit is set, the payload of the SETTINGS frame MUST be empty.
Receipt of a SETTINGS frame with the ACK flag set and a length
field value other than 0 MUST be treated as a connection error
(Section 5.4.1) of type FRAME_SIZE_ERROR
*/
#define H2_SETTING_ACK_FLG 0x1
#define H2_SETTINGS_HEADER_TABLE_SIZE 0x1
#define H2_SETTINGS_ENABLE_PUSH 0x2
#define H2_SETTINGS_MAX_CONCURRENT_STREAMS 0x3
#define H2_SETTINGS_INITIAL_WINDOW_SIZE 0x4
#define H2_SETTINGS_MAX_FRAME_SIZE 0x5
#define H2_SETTINGS_MAX_HEADER_LIST_SIZE 0x6
typedef struct{
uint32_t header_table_sz;
uint32_t enable_push;
uint32_t max_concurrent_streams;
uint32_t init_win_sz;
uint32_t max_frame_sz;
uint32_t max_header_list_sz;
} antd_h2_conn_setting_t;
typedef struct antd_h2_stream_list_t* antd_h2_stream_list_t;
/** /**
* Struct that holds a * Struct that holds a
* h2 connection * h2 connection
*/ */
typedef struct { typedef struct {
antd_h2_conn_setting_t settings;
antd_h2_stream_list_t* streams;
int win_sz;
int last_stream_id;
} antd_h2_conn_t; } antd_h2_conn_t;
#define H2_CONN(rq) ((antd_h2_conn_t*)dvalue(((antd_request_t*)rq)->request,"H2_CONNECTION"))
#define H2_SETTING(rq) (H2_CON(rq)->settings)
/** /**
* Struct that holds a * Struct that holds a
* h2 stream * h2 stream
*/ */
typedef struct { typedef struct {
struct queue_root* stdin;
struct queue_root* stdout;
int win_sz;
int id;
} antd_h2_stream_t; } antd_h2_stream_t;
/** /**
@ -121,13 +167,21 @@ typedef struct {
unsigned int identifier; unsigned int identifier;
} antd_h2_frame_header_t; } antd_h2_frame_header_t;
/*stream utilities functions*/
void antd_h2_close_stream(antd_h2_stream_t* stream);
void antd_h2_add_stream(antd_h2_stream_list_t*, antd_h2_stream_t*);
antd_h2_stream_t* antd_h2_get_stream(antd_h2_stream_list_t*, int);
void antd_h2_del_stream(antd_h2_stream_list_t*, int);
void antd_h2_close_all_streams(antd_h2_stream_list_t);
void antd_h2_update_streams_win_sz(antd_h2_stream_list_t streams, int offset);
/*Connection utilities funtions*/
antd_h2_conn_t* antd_h2_open_conn();
void antd_h2_close_conn(antd_h2_conn_t*);
void* antd_h2_read(void* rq); void* antd_h2_read(void* rq);
void* antd_h2_write(void* rq); void* antd_h2_write(void* rq);
void* antd_h2_preface_ck(void* rq); void* antd_h2_preface_ck(void* rq);
void* antd_h2_handle(void* rq); void* antd_h2_handle(void* rq);
int antd_h2_send_frame(antd_request_t*, antd_h2_frame_header_t*, uint8_t*);
#endif #endif

View File

@ -1,5 +1,4 @@
#include "handle.h" #include "handle.h"
#define HTML_TPL "<HTML><HEAD><TITLE>%s</TITLE></HEAD><BODY><h2>%s</h2></BODY></HTML>" #define HTML_TPL "<HTML><HEAD><TITLE>%s</TITLE></HEAD><BODY><h2>%s</h2></BODY></HTML>"
static const char* S_100 = "Continue"; static const char* S_100 = "Continue";
@ -850,35 +849,3 @@ int read_buf(void* sock, char*buf,int size)
buf[i] = '\0'; buf[i] = '\0';
return i; return i;
} }
/*
We put it here since we want the plugin is able
to destroy the request if it want to
in this case, the plugin should return an empty
with no data
*/
void destroy_request(void *data)
{
if (!data)
return;
antd_request_t *rq = (antd_request_t *)data;
//LOG("Close request %d", rq->client->sock);
// free all other thing
if (rq->request)
{
dictionary_t tmp = dvalue(rq->request, "COOKIE");
if (tmp)
freedict(tmp);
tmp = dvalue(rq->request, "REQUEST_HEADER");
if (tmp)
freedict(tmp);
tmp = dvalue(rq->request, "REQUEST_DATA");
if (tmp)
freedict(tmp);
dput(rq->request, "REQUEST_HEADER", NULL);
dput(rq->request, "REQUEST_DATA", NULL);
dput(rq->request, "COOKIE", NULL);
freedict(rq->request);
}
antd_close(rq->client);
free(rq);
}

View File

@ -142,5 +142,4 @@ int read_buf(void* sock,char* buf,int i);
int antd_send( void *source, const void* data, int len); int antd_send( void *source, const void* data, int len);
int antd_recv( void *source, void* data, int len); int antd_recv( void *source, void* data, int len);
int antd_close(void* source); int antd_close(void* source);
void destroy_request(void *data);
#endif #endif

84
lib/queue.c Normal file
View File

@ -0,0 +1,84 @@
// code base on: https://github.com/majek/dump/blob/master/msqueue/queue_semiblocking.c
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include "queue.h"
#define QUEUE_POISON1 ((void*)0xCAFEBAB5)
struct queue_root {
struct queue_head *in_queue;
struct queue_head *out_queue;
// pthread_spinlock_t lock;
pthread_mutex_t lock;
};
#ifndef _cas
# define _cas(ptr, oldval, newval) \
__sync_bool_compare_and_swap(ptr, oldval, newval)
#endif
struct queue_root *ALLOC_QUEUE_ROOT()
{
struct queue_root *root = \
malloc(sizeof(struct queue_root));
// pthread_spin_init(&root->lock, PTHREAD_PROCESS_PRIVATE);
pthread_mutex_init(&root->lock, NULL);
root->in_queue = NULL;
root->out_queue = NULL;
return root;
}
void INIT_QUEUE_HEAD(struct queue_head *head)
{
head->next = QUEUE_POISON1;
}
void queue_put(struct queue_head *new,
struct queue_root *root)
{
while (1) {
struct queue_head *in_queue = root->in_queue;
new->next = in_queue;
if (_cas(&root->in_queue, in_queue, new)) {
break;
}
}
}
struct queue_head *queue_get(struct queue_root *root)
{
// pthread_spin_lock(&root->lock);
pthread_mutex_lock(&root->lock);
if (!root->out_queue) {
while (1) {
struct queue_head *head = root->in_queue;
if (!head) {
break;
}
if (_cas(&root->in_queue, head, NULL)) {
// Reverse the order
while (head) {
struct queue_head *next = head->next;
head->next = root->out_queue;
root->out_queue = head;
head = next;
}
break;
}
}
}
struct queue_head *head = root->out_queue;
if (head) {
root->out_queue = head->next;
}
// pthread_spin_unlock(&root->lock);
pthread_mutex_unlock(&root->lock);
return head;
}

20
lib/queue.h Normal file
View File

@ -0,0 +1,20 @@
// code based on : https://github.com/majek/dump/blob/master/msqueue/queue.h
#ifndef QUEUE_H
#define QUEUE_H
struct queue_root;
struct queue_head {
void* data;
struct queue_head *next;
};
struct queue_root *ALLOC_QUEUE_ROOT();
void INIT_QUEUE_HEAD(struct queue_head *head);
void queue_put(struct queue_head *,struct queue_root *);
struct queue_head *queue_get(struct queue_root *root);
#endif // QUEUE_H