fix race bug

This commit is contained in:
Xuan Sang LE 2018-09-27 17:18:31 +02:00
parent cbd574cbc0
commit 428c6ffb89
3 changed files with 38 additions and 39 deletions

View File

@ -146,19 +146,36 @@ static void destroy_queue(antd_task_queue_t q)
free(curr);
}
}
static int antd_has_pending_task()
{
int ret = 0;
for(int i = 0; i < N_PRIORITY; i++)
if(scheduler.task_queue[i] != NULL)
{
ret = 1;
break;
}
if(!ret)
{
ret = (scheduler.workers_queue != NULL);
}
return ret;
}
static int antd_available_workers()
{
int n = 0;
//pthread_mutex_lock(&scheduler.worker_lock);
for(int i=0; i < scheduler.n_workers; i++)
if(scheduler.workers[i].status == 0) n++;
//pthread_mutex_unlock(&scheduler.worker_lock);
return n;
}
/*
Main API methods
init the main scheduler
*/
int antd_available_workers()
{
int n = 0;
pthread_mutex_lock(&scheduler.worker_lock);
for(int i=0; i < scheduler.n_workers; i++)
if(scheduler.workers[i].status == 0) n++;
pthread_mutex_unlock(&scheduler.worker_lock);
return n;
}
/*
* assign task to a worker
*/
@ -303,32 +320,17 @@ void antd_execute_task(antd_task_item_t taski)
}
}
}
int antd_has_pending_task()
{
int ret = 0;
pthread_mutex_lock(&scheduler.scheduler_lock);
for(int i = 0; i < N_PRIORITY; i++)
if(scheduler.task_queue[i] != NULL)
{
ret = 1;
break;
}
pthread_mutex_unlock(&scheduler.scheduler_lock);
if(!ret)
{
pthread_mutex_lock(&scheduler.queue_lock);
ret = (scheduler.workers_queue != NULL);
pthread_mutex_unlock(&scheduler.queue_lock);
}
return ret;
}
int antd_scheduler_busy()
{
if(antd_available_workers() != scheduler.n_workers) return 1;
//return 0;
return antd_has_pending_task();
pthread_mutex_lock(&scheduler.worker_lock);
pthread_mutex_lock(&scheduler.scheduler_lock);
pthread_mutex_lock(&scheduler.queue_lock);
int ret = (antd_available_workers() != scheduler.n_workers) || antd_has_pending_task();
pthread_mutex_unlock(&scheduler.queue_lock);
pthread_mutex_unlock(&scheduler.scheduler_lock);
pthread_mutex_unlock(&scheduler.worker_lock);
return ret;
}
int antd_scheduler_status()
{
@ -362,7 +364,6 @@ void antd_task_schedule()
{
// delegate to other workers by
//pushing to the worker queue
LOG("delegate to workers\n");
pthread_mutex_lock(&scheduler.queue_lock);
enqueue(&scheduler.workers_queue, it->task);
free(it);

View File

@ -106,6 +106,4 @@ void antd_execute_task(antd_task_item_t);
int antd_scheduler_busy();
void antd_attach_task(antd_worker_t* worker);
void antd_task_schedule();
int antd_available_workers();
int antd_has_pending_task();
#endif

View File

@ -85,9 +85,9 @@ int main(int argc, char* argv[])
signal(SIGABRT, SIG_IGN);
signal(SIGINT, stop_serve);
server_sock = startup(&port);
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 500;
//struct timeval timeout;
//timeout.tv_sec = 0;
//timeout.tv_usec = 500;
// 0 worker
antd_scheduler_init(0);
// set server socket to non blocking