// code base on: https://github.com/majek/dump/blob/master/msqueue/queue_semiblocking.c #include #include #include #include "queue.h" #define QUEUE_POISON1 ((void*)0xCAFEBAB5) struct queue_root { struct queue_head *in_queue; struct queue_head *out_queue; // pthread_spinlock_t lock; pthread_mutex_t lock; }; #ifndef _cas # define _cas(ptr, oldval, newval) \ __sync_bool_compare_and_swap(ptr, oldval, newval) #endif struct queue_root *ALLOC_QUEUE_ROOT() { struct queue_root *root = \ malloc(sizeof(struct queue_root)); // pthread_spin_init(&root->lock, PTHREAD_PROCESS_PRIVATE); pthread_mutex_init(&root->lock, NULL); root->in_queue = NULL; root->out_queue = NULL; return root; } void INIT_QUEUE_HEAD(struct queue_head *head) { head->next = QUEUE_POISON1; } void queue_put(struct queue_head *new, struct queue_root *root) { while (1) { struct queue_head *in_queue = root->in_queue; new->next = in_queue; if (_cas(&root->in_queue, in_queue, new)) { break; } } } int queue_readable(struct queue_root *root) { return root->out_queue != NULL && root->out_queue != QUEUE_POISON1; } void queue_empty(struct queue_root * root, void (* fn)(void*)) { struct queue_head* head = NULL; while((head = queue_get(root)) != NULL) { if(fn) { fn(head->data); } free(head); } } struct queue_head *queue_get(struct queue_root *root) { // pthread_spin_lock(&root->lock); pthread_mutex_lock(&root->lock); if (!root->out_queue) { while (1) { struct queue_head *head = root->in_queue; if (!head) { break; } if (_cas(&root->in_queue, head, NULL)) { // Reverse the order while (head) { struct queue_head *next = head->next; head->next = root->out_queue; root->out_queue = head; head = next; } break; } } } struct queue_head *head = root->out_queue; if (head) { root->out_queue = head->next; } // pthread_spin_unlock(&root->lock); pthread_mutex_unlock(&root->lock); return head; }