diff --git a/libs/scheduler.c b/libs/scheduler.c index 8f7dc41..3db1213 100644 --- a/libs/scheduler.c +++ b/libs/scheduler.c @@ -146,19 +146,36 @@ static void destroy_queue(antd_task_queue_t q) free(curr); } } +static int antd_has_pending_task() +{ + int ret = 0; + + for(int i = 0; i < N_PRIORITY; i++) + if(scheduler.task_queue[i] != NULL) + { + ret = 1; + break; + } + if(!ret) + { + ret = (scheduler.workers_queue != NULL); + } + + return ret; +} +static int antd_available_workers() +{ + int n = 0; + //pthread_mutex_lock(&scheduler.worker_lock); + for(int i=0; i < scheduler.n_workers; i++) + if(scheduler.workers[i].status == 0) n++; + //pthread_mutex_unlock(&scheduler.worker_lock); + return n; +} /* Main API methods init the main scheduler */ -int antd_available_workers() -{ - int n = 0; - pthread_mutex_lock(&scheduler.worker_lock); - for(int i=0; i < scheduler.n_workers; i++) - if(scheduler.workers[i].status == 0) n++; - pthread_mutex_unlock(&scheduler.worker_lock); - return n; -} /* * assign task to a worker */ @@ -303,32 +320,17 @@ void antd_execute_task(antd_task_item_t taski) } } } -int antd_has_pending_task() -{ - int ret = 0; - pthread_mutex_lock(&scheduler.scheduler_lock); - for(int i = 0; i < N_PRIORITY; i++) - if(scheduler.task_queue[i] != NULL) - { - ret = 1; - break; - } - pthread_mutex_unlock(&scheduler.scheduler_lock); - if(!ret) - { - pthread_mutex_lock(&scheduler.queue_lock); - ret = (scheduler.workers_queue != NULL); - pthread_mutex_unlock(&scheduler.queue_lock); - } - - return ret; -} + int antd_scheduler_busy() { - - if(antd_available_workers() != scheduler.n_workers) return 1; - //return 0; - return antd_has_pending_task(); + pthread_mutex_lock(&scheduler.worker_lock); + pthread_mutex_lock(&scheduler.scheduler_lock); + pthread_mutex_lock(&scheduler.queue_lock); + int ret = (antd_available_workers() != scheduler.n_workers) || antd_has_pending_task(); + pthread_mutex_unlock(&scheduler.queue_lock); + pthread_mutex_unlock(&scheduler.scheduler_lock); + pthread_mutex_unlock(&scheduler.worker_lock); + return ret; } int antd_scheduler_status() { @@ -362,7 +364,6 @@ void antd_task_schedule() { // delegate to other workers by //pushing to the worker queue - LOG("delegate to workers\n"); pthread_mutex_lock(&scheduler.queue_lock); enqueue(&scheduler.workers_queue, it->task); free(it); diff --git a/libs/scheduler.h b/libs/scheduler.h index de55a49..e025fa7 100644 --- a/libs/scheduler.h +++ b/libs/scheduler.h @@ -106,6 +106,4 @@ void antd_execute_task(antd_task_item_t); int antd_scheduler_busy(); void antd_attach_task(antd_worker_t* worker); void antd_task_schedule(); -int antd_available_workers(); -int antd_has_pending_task(); #endif \ No newline at end of file diff --git a/relay.c b/relay.c index 4274dcd..48b64e4 100644 --- a/relay.c +++ b/relay.c @@ -85,9 +85,9 @@ int main(int argc, char* argv[]) signal(SIGABRT, SIG_IGN); signal(SIGINT, stop_serve); server_sock = startup(&port); - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = 500; + //struct timeval timeout; + //timeout.tv_sec = 0; + //timeout.tv_usec = 500; // 0 worker antd_scheduler_init(0); // set server socket to non blocking