From d62b635995ea5be368bc62d1c34c16cf929338bc Mon Sep 17 00:00:00 2001 From: lxsang Date: Wed, 3 Feb 2021 17:56:46 +0100 Subject: [PATCH] Log worker task in statistic --- lib/scheduler.c | 75 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 49 insertions(+), 26 deletions(-) diff --git a/lib/scheduler.c b/lib/scheduler.c index 6137dc3..5b529d6 100644 --- a/lib/scheduler.c +++ b/lib/scheduler.c @@ -49,6 +49,7 @@ typedef struct { int id; pthread_t tid; + antd_task_t* current_task; void *manager; } antd_worker_t; @@ -229,6 +230,7 @@ static void *work(antd_worker_t *worker) pthread_mutex_lock(&scheduler->worker_lock); it = dequeue(&scheduler->workers_queue); pthread_mutex_unlock(&scheduler->worker_lock); + worker->current_task = it->task; // execute the task //LOG("task executed by worker %d\n", worker->pid); // no task to execute, just sleep wait @@ -242,45 +244,54 @@ static void *work(antd_worker_t *worker) //LOG("task executed by worker %d\n", worker->id); antd_execute_task(scheduler, it->task); free(it); + worker->current_task = NULL; } } return NULL; } +static void antd_task_dump(int fd, antd_task_t* task, char* buffer) +{ + if (task == NULL || fd < 0) + { + return; + } + int ret; + // send statistic on task data + snprintf(buffer, MAX_FIFO_NAME_SZ, "---- Task %d created at: %lu ----\n", task->id, task->stamp); + ret = write(fd, buffer, strlen(buffer)); + + // send statistic on task data + snprintf(buffer, MAX_FIFO_NAME_SZ, "Access time: %lu\nn", (unsigned long)task->access_time); + ret = write(fd, buffer, strlen(buffer)); + + snprintf(buffer, MAX_FIFO_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL)); + ret = write(fd, buffer, strlen(buffer)); + + if (task->handle) + { + snprintf(buffer, MAX_FIFO_NAME_SZ, "Has handle: yes\n"); + ret = write(fd, buffer, strlen(buffer)); + } + + if (task->callback) + { + snprintf(buffer, MAX_FIFO_NAME_SZ, "Has callback: yes\n"); + ret = write(fd, buffer, strlen(buffer)); + } + UNUSED(ret); + // now print all task data statistic + antd_scheduler_ext_statistic(fd, task->data); +} static void print_static_info(bst_node_t *node, void **args, int argc) { if (argc != 2) { return; } - int ret; char *buffer = args[0]; int *fdp = args[1]; antd_task_t *task = (antd_task_t *)node->data; - // send statistic on task data - snprintf(buffer, MAX_FIFO_NAME_SZ, "---- Task %d created at: %lu ----\n", task->id, task->stamp); - ret = write(*fdp, buffer, strlen(buffer)); - - // send statistic on task data - snprintf(buffer, MAX_FIFO_NAME_SZ, "Access time: %lu\nn", (unsigned long)task->access_time); - ret = write(*fdp, buffer, strlen(buffer)); - - snprintf(buffer, MAX_FIFO_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL)); - ret = write(*fdp, buffer, strlen(buffer)); - - if (task->handle) - { - snprintf(buffer, MAX_FIFO_NAME_SZ, "Has handle: yes\n"); - ret = write(*fdp, buffer, strlen(buffer)); - } - - if (task->callback) - { - snprintf(buffer, MAX_FIFO_NAME_SZ, "Has callback: yes\n"); - ret = write(*fdp, buffer, strlen(buffer)); - } - UNUSED(ret); - // now print all task data statistic - antd_scheduler_ext_statistic(*fdp, task->data); + antd_task_dump(*fdp, task, buffer); } static void *statistic(antd_scheduler_t *scheduler) { @@ -331,6 +342,18 @@ static void *statistic(antd_scheduler_t *scheduler) bst_for_each(scheduler->task_queue, print_static_info, argc, 2); pthread_mutex_unlock(&scheduler->scheduler_lock); + + // write worker current task + for (int i = 0; i < scheduler->n_workers; i++) + { + snprintf(buffer, MAX_FIFO_NAME_SZ, "Worker: %d. Detail:\n", i); + ret = write(scheduler->stat_fd, buffer, strlen(buffer)); + if(scheduler->workers[i].current_task) + { + antd_task_dump(scheduler->stat_fd, scheduler->workers[i].current_task, buffer); + } + } + ret = close(scheduler->stat_fd); scheduler->stat_fd = -1; usleep(5000);