diff --git a/Makefile.am b/Makefile.am index 52cff3d..7310ec3 100644 --- a/Makefile.am +++ b/Makefile.am @@ -15,7 +15,7 @@ install-data-local: EXTRA_DIST = runner.ini runnerd tunnel.h antd-tunnel-publisher.service log.h -SUBDIRS = . vterm wfifo syslog +SUBDIRS = . vterm wfifo syslog broadcast if ENABLE_CAM SUBDIRS += v4l2cam diff --git a/broadcast/Makefile.am b/broadcast/Makefile.am new file mode 100644 index 0000000..f6ae9f6 --- /dev/null +++ b/broadcast/Makefile.am @@ -0,0 +1,11 @@ +AUTOMAKE_OPTIONS = foreign + + + +AM_CPPFLAGS = -W -Wall -g -std=c99 + +# bin +bin_PROGRAMS = broadcast +# source files +broadcast_SOURCES = broadcast.c ../tunnel.c +broadcast_CPPFLAGS= -I../ diff --git a/broadcast/broadcast b/broadcast/broadcast new file mode 100755 index 0000000..f8f214a Binary files /dev/null and b/broadcast/broadcast differ diff --git a/broadcast/broadcast.c b/broadcast/broadcast.c new file mode 100644 index 0000000..c520607 --- /dev/null +++ b/broadcast/broadcast.c @@ -0,0 +1,456 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "../tunnel.h" + +#define BC_ERROR(r, fd, c, ...) \ + do \ + { \ + r.header.client_id = c; \ + r.header.type = CHANNEL_ERROR; \ + (void)snprintf(r.data, BUFFLEN, ##__VA_ARGS__); \ + r.header.size = strlen(r.data); \ + M_ERROR(MODULE_NAME, "%s", r.data); \ + (void)msg_write(fd, &r); \ + } while (0) + +#define MODULE_NAME "broadcast" + +#define BC_SUBSCRIPTION 0x0A +#define BC_UNSUBSCRIPTION 0x0B +#define BC_QUERY_GROUP 0x0C + +#define MAX_STR_LEN 255 + +typedef struct +{ + int ref; + char name[MAX_STR_LEN]; + bst_node_t *groups; +} bc_client_t; + +/** + * @brief Send notified to client. + * + * @param type + * @param bc_client broadcast client handle + * @param groupname group name + */ +static void bc_notify(bst_node_t *node, void **argv, int argc); +static void int_handler(int dummy); +static void bc_get_handle(bst_node_t *node, void **argv, int argc); +static void bc_unsubscription(bst_node_t *node, void **args, int argc); +static void unsubscribe(bst_node_t *node, void **args, int argc); +static void bc_send_query(bst_node_t *node, void **argv, int argc); + +static bst_node_t *clients = NULL; +static uint8_t msg_buffer[BUFFLEN]; +static volatile int running = 1; + +static void int_handler(int dummy) +{ + (void)dummy; + running = 0; +} + +static void bc_get_handle(bst_node_t *node, void **argv, int argc) +{ + (void)argc; + bc_client_t **bc_client = (bc_client_t **)argv[1]; + char *name = (char *)argv[2]; + bc_client_t *node_client = (bc_client_t *)node->data; + if (bc_client == NULL || name == NULL || node->data == NULL) + { + return; + } + M_DEBUG(MODULE_NAME, "comparing %s vs %s", name, node_client->name); + if (strcmp(name, node_client->name) == 0) + { + M_LOG(MODULE_NAME, "Handle for user %s exits (ref %d)", name, node_client->ref); + *bc_client = node_client; + } +} +static void bc_unsubscription(bst_node_t *node, void **args, int argc) +{ + (void)argc; + tunnel_msg_t *msg = (tunnel_msg_t *)args[1]; + int len = *((int *)args[3]) + 2; + if (!node) + return; + // write hash value to data + int hash = node->key; + uint32_t net32 = htonl(hash); + (void)memcpy(&msg->data[len], &net32, sizeof(net32)); + msg->header.size = len + sizeof(net32); + args[2] = &hash; + M_DEBUG(MODULE_NAME, "All clients subscribed to the groupe %d is notified that user is leaving", hash); + bst_for_each(clients, bc_notify, args, 3); +} +static void unsubscribe(bst_node_t *node, void **args, int argc) +{ + (void)argc; + tunnel_msg_t msg; + int *ufd = (int *)args[0]; + int len; + bc_client_t *bc_client = (bc_client_t *)node->data; + void *bc_argv[] = {ufd, &msg, 0, &len}; + if (!node || !node->data) + { + return; + } + bc_client->ref--; + // notify all clients in our groups that we're done + msg.header.type = CHANNEL_CTRL; + msg.data = msg_buffer; + msg.data[0] = BC_UNSUBSCRIPTION; + len = strlen(bc_client->name); + msg.data[1] = (uint8_t)len; + (void)memcpy(&msg.data[2], bc_client->name, len); + // group name + // unsubscribe + if (bc_client->ref <= 0) + { + M_DEBUG(MODULE_NAME, "User %s is leaving all its subscribed groups (%d)", bc_client->name, bc_client->groups == NULL); + bst_for_each(bc_client->groups, bc_unsubscription, bc_argv, 3); + M_DEBUG(MODULE_NAME, "Handle for user %s ref is %d, free handle data", bc_client->name, bc_client->ref); + bst_free(bc_client->groups); + free(node->data); + } + msg.header.type = CHANNEL_UNSUBSCRIBE; + msg.header.client_id = node->key; + msg.header.size = 0; + msg.data = NULL; + if (msg_write(*ufd, &msg) == -1) + { + M_ERROR(MODULE_NAME, "Unable to request unsubscribe to client %d", node->key); + } +} +static void bc_send_query(bst_node_t *node, void **argv, int argc) +{ + (void)argc; + int *fd = (int *)argv[0]; + tunnel_msg_t *msg = (tunnel_msg_t *)argv[1]; + bc_client_t *bc_client = NULL; + int *group = (int *)argv[2]; + int len = *(int *)argv[3]; + if (!node->data) + { + return; + } + bc_client = (bc_client_t *)node->data; + if (bst_find(bc_client->groups, *group) == NULL) + { + return; + } + (void)memcpy(&msg->data[len], bc_client->name, strlen(bc_client->name)); + msg->header.size = len + strlen(bc_client->name); + if (msg_write(*fd, msg) == -1) + { + M_ERROR(MODULE_NAME, "Unable to write notify message to client %d", node->key); + } +} +static void bc_notify(bst_node_t *node, void **argv, int argc) +{ + (void)argc; + int *fd = (int *)argv[0]; + tunnel_msg_t *msg = (tunnel_msg_t *)argv[1]; + bc_client_t *bc_client = NULL; + int *group = (int *)argv[2]; + if (!node->data) + { + return; + } + bc_client = (bc_client_t *)node->data; + if (bst_find(bc_client->groups, *group) == NULL) + { + return; + } + msg->header.client_id = node->key; + if (msg_write(*fd, msg) == -1) + { + M_ERROR(MODULE_NAME, "Unable to write notify message to client %d", node->key); + } + M_DEBUG(MODULE_NAME, "Notify message sent to client %d", node->key); +} +int main(int argc, char **argv) +{ + int fd, hash; + size_t len; + tunnel_msg_t request, response; + fd_set fd_in; + int status, length; + char name[MAX_STR_LEN + 1]; + void *fargv[4]; + bst_node_t *node_p; + uint32_t net32; + bc_client_t *bc_client; + LOG_INIT(MODULE_NAME); + if (argc != 3) + { + printf("Usage: %s path/to/hotline/socket channel_name\n", argv[0]); + return -1; + } + signal(SIGPIPE, SIG_IGN); + signal(SIGABRT, SIG_IGN); + signal(SIGINT, int_handler); + // now try to request new channel from hotline + fd = open_unix_socket(argv[1]); + if (fd == -1) + { + M_ERROR(MODULE_NAME, "Unable to open the hotline: %s", argv[1]); + return -1; + } + request.header.type = CHANNEL_OPEN; + request.header.channel_id = 0; + request.header.client_id = 0; + M_LOG(MODULE_NAME, "Request to open the channel %s", argv[2]); + (void)strncpy(name, argv[2], sizeof(name)); + request.header.size = strlen(name); + request.data = (uint8_t *)name; + if (msg_write(fd, &request) == -1) + { + M_ERROR(MODULE_NAME, "Unable to write message to hotline"); + (void)close(fd); + return -1; + } + M_DEBUG(MODULE_NAME, "Wait for comfirm creation of %s", argv[2]); + // now wait for message + if (msg_read(fd, &response) == -1) + { + M_ERROR(MODULE_NAME, "Unable to read message from hotline"); + (void)close(fd); + return -1; + } + if (response.header.type == CHANNEL_OK) + { + M_LOG(MODULE_NAME, "Channel created: %s", argv[2]); + } + else + { + M_ERROR(MODULE_NAME, "Channel is not created: %s. Tunnel service responds with msg of type %d", argv[2], response.header.type); + running = 0; + } + if (response.data) + free(response.data); + // now read data + fargv[0] = (void *)&fd; + while (running) + { + FD_ZERO(&fd_in); + FD_SET(fd, &fd_in); + status = select(fd + 1, &fd_in, NULL, NULL, NULL); + + switch (status) + { + case -1: + M_ERROR(MODULE_NAME, "Error %d on select()\n", errno); + running = 0; + break; + case 0: + break; + // we have data + default: + if (msg_read(fd, &request) == -1) + { + M_ERROR(MODULE_NAME, "Unable to read message from channel. quit"); + running = 0; + } + else + { + switch (request.header.type) + { + case CHANNEL_SUBSCRIBE: + if (request.header.size > MAX_STR_LEN - 1) + { + M_ERROR(MODULE_NAME, "User name string overflow"); + } + else + { + node_p = bst_find(clients, request.header.client_id); + if (node_p) + { + M_LOG(MODULE_NAME, "Client %d is already subscript to this channel", request.header.client_id); + } + { + // store user name + bc_client = NULL; + (void)memcpy(name, request.data, request.header.size); + name[request.header.size] = '\0'; + fargv[1] = &bc_client; + fargv[2] = name; + bst_for_each(clients, bc_get_handle, fargv, 3); + if (bc_client) + { + bc_client->ref++; + } + else + { + bc_client = (bc_client_t *)malloc(sizeof(bc_client_t)); + (void)strncpy(bc_client->name, name, MAX_STR_LEN); + bc_client->groups = NULL; + bc_client->ref = 1; + } + clients = bst_insert(clients, request.header.client_id, bc_client); + M_LOG(MODULE_NAME, "Client %s (%d) subscribes to the chanel (ref %d)", bc_client->name, request.header.client_id, bc_client->ref); + } + } + break; + case CHANNEL_CTRL: + /** + * @brief message in format [code][group name] + * + */ + if (request.header.size > 0u && request.header.size <= MAX_STR_LEN) + { + node_p = bst_find(clients, request.header.client_id); + if (node_p && node_p->data) + { + bc_client = (bc_client_t *)node_p->data; + fargv[1] = &response; + fargv[2] = &hash; + response.header.channel_id = request.header.channel_id; + response.header.type = CHANNEL_CTRL; + response.data = msg_buffer; + response.data[0] = request.data[0]; + switch (request.data[0]) + { + case BC_SUBSCRIPTION: + case BC_UNSUBSCRIPTION: + /** + * @brief * The notify message is in the following format + * [1 byte type][1 byte user name size][user name][4byte gid][groupname (optional)] + * + */ + + if (request.data[0] == BC_SUBSCRIPTION) + { + memcpy(name, &request.data[1], request.header.size - 1u); + name[request.header.size - 1u] = '\0'; + hash = simple_hash(name); + bc_client->groups = (void *)bst_insert(bc_client->groups, hash, NULL); + M_LOG(MODULE_NAME, "Client %d subscription to broadcast group: %s (%d)", request.header.client_id, name, hash); + } + else + { + name[0] = '\0'; + (void)memcpy(&hash, &request.data[1], sizeof(hash)); + hash = ntohl(hash); + } + len = strlen(bc_client->name); + response.data[1] = (uint8_t)len; + (void)memcpy(&response.data[2], bc_client->name, len); + len += 2u; + // group name + net32 = htonl(hash); + (void)memcpy(&response.data[len], &net32, sizeof(net32)); + len += sizeof(net32); + (void)memcpy(&response.data[len], name, strlen(name)); + response.header.size = len + strlen(name); + bst_for_each(clients, bc_notify, fargv, 3); + if (request.data[0] == BC_UNSUBSCRIPTION) + { + bc_client->groups = (void *)bst_delete(bc_client->groups, hash); + M_LOG(MODULE_NAME, "Client %d leaves broadcast group: %d", request.header.client_id, hash); + } + break; + + case BC_QUERY_GROUP: + (void)memcpy(&hash, &request.data[1], sizeof(hash)); + hash = ntohl(hash); + net32 = htonl(hash); + (void)memcpy(&response.data[1], &net32, sizeof(net32)); + len = len = sizeof(net32) + 1u; + fargv[3] = &len; + if (bst_find(bc_client->groups, hash) == NULL) + { + BC_ERROR(response, fd, request.header.client_id, "Client %d query a group that it does not belong to", request.header.client_id); + } + else + { + // data format [type][4bytes group][user] + M_LOG(MODULE_NAME, "Client %d query group: %s (%d)", request.header.client_id, name, hash); + bst_for_each(clients, bc_send_query, fargv, 4); + } + break; + default: + BC_ERROR(response, fd, request.header.client_id, "Invalid client control message: 0x%.2X", request.data[0]); + break; + } + } + else + { + BC_ERROR(response, fd, request.header.client_id, "Client %d does not previously subscribe to the channel", request.header.client_id); + } + } + else + { + BC_ERROR(response, fd, request.header.client_id, "Invalid CTRL message size: %d", request.header.size); + } + break; + case CHANNEL_DATA: + /** + * @brief Send message to a group of client + * message is in the following format + * [4bytes group id][data] + */ + (void)memcpy(&hash, &request.data[0], sizeof(hash)); + hash = ntohl(hash); + fargv[1] = &request; + fargv[2] = &hash; + bst_for_each(clients, bc_notify, fargv, 3); + break; + case CHANNEL_UNSUBSCRIBE: + M_LOG(MODULE_NAME, "Client %d unsubscribes to the chanel", request.header.client_id); + + node_p = bst_find(clients, request.header.client_id); + unsubscribe(node_p, fargv, 1); + clients = bst_delete(clients, request.header.client_id); + break; + + default: + M_LOG(MODULE_NAME, "Client %d send message of type %d", + request.header.client_id, request.header.type); + break; + } + if (request.data) + { + free(request.data); + } + } + } + } + // unsubscribe all client + bst_for_each(clients, unsubscribe, fargv, 1); + bst_free(clients); + // close the channel + M_LOG(MODULE_NAME, "Close the channel %s (%d)", argv[2], fd); + request.header.type = CHANNEL_CLOSE; + request.header.size = 0; + request.data = NULL; + if (msg_write(fd, &request) == -1) + { + M_ERROR(MODULE_NAME, "Unable to request channel close"); + } + + if (msg_read(fd, &response) == 0) + { + if (response.data) + { + free(response.data); + } + } + (void)close(fd); + return 0; +} \ No newline at end of file diff --git a/configure.ac b/configure.ac index fe14e73..b00eb6b 100644 --- a/configure.ac +++ b/configure.ac @@ -1,5 +1,5 @@ # initialise autoconf and set up some basic information about the program we’re packaging -AC_INIT([antd-publishers], [0.1.1a], [xsang.le@gmail.com]) +AC_INIT([antd-publishers], [0.1.2a], [xsang.le@gmail.com]) # We’re going to use automake for this project # [subdir-objects] if needed @@ -89,6 +89,7 @@ AC_CONFIG_FILES([ vterm/Makefile wfifo/Makefile syslog/Makefile + broadcast/Makefile ]) if test x"${cam_enable}" == x"yes" ; then diff --git a/dist/antd-publishers-0.1.1a.tar.gz b/dist/antd-publishers-0.1.1a.tar.gz index a100359..35e24cc 100644 Binary files a/dist/antd-publishers-0.1.1a.tar.gz and b/dist/antd-publishers-0.1.1a.tar.gz differ diff --git a/dist/antd-publishers-0.1.2a.tar.gz b/dist/antd-publishers-0.1.2a.tar.gz new file mode 100644 index 0000000..973f498 Binary files /dev/null and b/dist/antd-publishers-0.1.2a.tar.gz differ diff --git a/log.h b/log.h index f1716f9..435ed79 100644 --- a/log.h +++ b/log.h @@ -2,18 +2,33 @@ #define LOG_H #include +#include -#define LOG_INIT(m) do { \ - setlogmask (LOG_UPTO (LOG_NOTICE)); \ - openlog ((m), LOG_CONS | LOG_PID | LOG_NDELAY, LOG_USER); \ - } while(0) +#define LOG_INIT(m) \ + do \ + { \ + if ((getenv("debug") != NULL) && (strcmp(getenv("debug"), "1") == 0)) \ + { \ + setlogmask(LOG_UPTO(LOG_INFO)); \ + M_LOG(MODULE_NAME, "DEBUG ENABLED"); \ + } \ + else \ + { \ + setlogmask(LOG_UPTO(LOG_NOTICE)); \ + } \ + openlog((m), LOG_CONS | LOG_PID | LOG_NDELAY, LOG_USER); \ + } while (0) -#ifdef DEBUG - #define M_LOG(m, a,...) syslog ((LOG_NOTICE),m "_log@[%s: %d]: " a "\n", __FILE__, \ - __LINE__, ##__VA_ARGS__) -#else - #define M_LOG(m, a,...) do{}while(0) -#endif -#define M_ERROR(m, a,...) syslog ((LOG_ERR),m "_error@[%s: %d]: " a "\n", __FILE__, \ - __LINE__, ##__VA_ARGS__) +#define M_LOG(m, a, ...) syslog((LOG_NOTICE), m "_log@[%s: %d]: " a "\n", __FILE__, \ + __LINE__, ##__VA_ARGS__) +#define M_DEBUG(m, a, ...) syslog((LOG_INFO), m "_log@[%s: %d]: " a "\n", __FILE__, \ + __LINE__, ##__VA_ARGS__) +#define M_ERROR(m, a, ...) syslog((LOG_ERR), m "_error@[%s: %d]: " a "\n", __FILE__, \ + __LINE__, ##__VA_ARGS__) +#define ASSERT(b, m, ...) \ + if (!(b)) \ + { \ + M_ERROR(MODULE_NAME, "ASSERT ERROR: " m, ##__VA_ARGS__); \ + assert(b); \ + } #endif \ No newline at end of file diff --git a/runner.c b/runner.c index bcc3f27..39529e8 100644 --- a/runner.c +++ b/runner.c @@ -6,115 +6,133 @@ #include #include #include -#include #include +#include #include #include "log.h" -#define MODULE_NAME "runner" +#define MODULE_NAME "runner" +#define MAX_STR_LEN 255u +/** up to 20 arguments*/ +#define MAX_ARGC 10u + +typedef struct +{ + char *name; + char strings[MAX_ARGC * 2u][MAX_STR_LEN]; + char *envs[MAX_ARGC + 1]; + char *params[MAX_ARGC + 1]; + size_t n_params; + size_t n_envs; + size_t s_p; +} runner_cmd_t; static volatile int running = 1; -static list_t pids; - -static void int_handler(int dummy) { - (void) dummy; +static runner_cmd_t cmd; +static void int_handler(int dummy) +{ + (void)dummy; running = 0; } +static void execute_command(list_t *plist) +{ + pid_t pid; + ASSERT(cmd.name != NULL, "Invalid service handler (NULL)"); + pid = fork(); + ASSERT(pid != -1, "Unable to fork: %s", strerror(errno)); + if (pid == 0) + { + execve(cmd.params[0], &cmd.params[0], &cmd.envs[0]); + // Nothing below this line should be executed by child process. If so, + // it means that the execl function wasn't successfull, so lets exit: + _exit(1); + } + else + { + M_LOG(MODULE_NAME, "Running service %s (%d)...", cmd.name, pid); + (void)memset(&cmd, 0, sizeof(cmd)); + list_put_i(plist, pid); + } +} static int ini_handle(void *user_data, const char *section, const char *name, const char *value) { - pid_t pid; - char* argv[11]; - list_t list; - item_t item; - int i; - UNUSED(user_data); - if (EQU(section, "RUNNER")) + list_t *plist = (list_t *)user_data; + ASSERT(cmd.s_p < MAX_ARGC * 2u, "String buffer overflow: %ld", cmd.s_p); + ASSERT(cmd.n_params <= MAX_ARGC, "Max arguments reached %ld", cmd.n_params); + ASSERT(cmd.n_envs <= MAX_ARGC, "Max environment variables reached %ld", cmd.n_envs); + if ((cmd.name == NULL) || ! EQU(section, cmd.name)) { - if(EQU(name, "service")) + if (cmd.params[0]) { - list = split(value, " "); - i = list_size(list); - if( i > 10) - { - M_ERROR(MODULE_NAME, "Too many arguments %d, expected max 10", i); - return 0; - } - i = 0; - list_for_each(item, list) - { - argv[i] = item->value.ptr; - i++; - } - argv[i] = NULL; - M_LOG(MODULE_NAME, "Running service %s...", value); - pid = fork(); - if(pid == -1) - { - M_ERROR(MODULE_NAME, "Unable to fork: %s", strerror(errno)); - return 0; - } - if(pid == 0) - { - // child - execve(argv[0], &argv[0], NULL); - // Nothing below this line should be executed by child process. If so, - // it means that the execl function wasn't successfull, so lets exit: - _exit(1); - } - list_free(&list); - // parent - list_put_i(&pids, pid); - } - else - { - M_ERROR(MODULE_NAME, "Ignore unknown configuration %s = %s", name, value); - return 0; + execute_command(plist); } + cmd.n_params = 1u; + (void)strncpy(cmd.strings[cmd.s_p], section, MAX_STR_LEN); + cmd.name = cmd.strings[cmd.s_p]; + cmd.s_p++; + } + if (EQU(name, "exec")) + { + (void)strncpy(cmd.strings[cmd.s_p], value, MAX_STR_LEN); + cmd.params[0] = cmd.strings[cmd.s_p]; + cmd.s_p++; + } + if (EQU(name, "param")) + { + (void)strncpy(cmd.strings[cmd.s_p], value, MAX_STR_LEN); + cmd.params[cmd.n_params] = cmd.strings[cmd.s_p]; + cmd.s_p++; + cmd.n_params++; } else { - return 0; + (void)snprintf(cmd.strings[cmd.s_p], MAX_STR_LEN, "%s=%s", name, value); + cmd.envs[cmd.n_envs] = cmd.strings[cmd.s_p]; + cmd.s_p++; + cmd.n_envs++; } return 1; } int main(int argc, char const *argv[]) { - const char* conf_file; + const char *conf_file; item_t item; pid_t w_pid; int pid_count; + list_t pids; signal(SIGPIPE, SIG_IGN); - signal(SIGABRT, SIG_IGN); + signal(SIGABRT, SIG_IGN); signal(SIGINT, int_handler); - if(argc != 2) + if (argc != 2) { - printf("Usage: %s /path/to/conf.ini", argv[0]); + printf("Usage: %s /path/to/conf.ini\n", argv[0]); return -1; } conf_file = argv[1]; - + LOG_INIT(MODULE_NAME); + M_LOG(MODULE_NAME,"config file is %s", conf_file); pids = list_init(); - - if (ini_parse(conf_file, ini_handle, NULL) < 0) + (void)memset(&cmd, 0, sizeof(cmd)); + ASSERT(ini_parse(conf_file, ini_handle, &pids) == 0, "Can't load service from '%s'", conf_file); + if (cmd.params[0] != NULL) { - M_ERROR(MODULE_NAME, "Can't load '%s'", conf_file); - return -1; + execute_command(&pids); } pid_count = list_size(pids); // monitoring the process - while(running != 0 && pid_count != 0) + while (running != 0 && pid_count != 0) { pid_count = 0; list_for_each(item, pids) { - w_pid = waitpid((pid_t) item->value.i, NULL, WNOHANG); - if(w_pid == -1 || w_pid > 0) + w_pid = waitpid((pid_t)item->value.i, NULL, WNOHANG); + if (w_pid == -1 || w_pid > 0) { // child exits - M_LOG(MODULE_NAME, "Process %d exits\n", item->value.i); + M_LOG(MODULE_NAME, "Process %d exits", item->value.i); item->value.i = -1; } else @@ -128,9 +146,9 @@ int main(int argc, char const *argv[]) // kill all the remaining processes list_for_each(item, pids) { - if(item->value.i != -1) + if (item->value.i != -1) { - if(kill(item->value.i, SIGKILL) == - 1) + if (kill(item->value.i, SIGKILL) == -1) { M_ERROR(MODULE_NAME, "Unable to kill process %d: %s", item->value.i, strerror(errno)); } diff --git a/runner.ini b/runner.ini index 1221b69..32641cf 100644 --- a/runner.ini +++ b/runner.ini @@ -1,5 +1,21 @@ -[RUNNER] -service=/opt/www/bin/vterm /opt/www/tmp/channels/antd_hotline.sock -service=/opt/www/bin/wfifo /opt/www/tmp/channels/antd_hotline.sock notification /var/wfifo_notification r +[vterm] +exec = /opt/www/bin/vterm +param = /opt/www/tmp/channels/antd_hotline.sock +debug = 0 + +[notification_fifo] +exec = /opt/www/bin/wfifo +param = /opt/www/tmp/channels/antd_hotline.sock +param = notification +param = /var/wfifo_notification +param = r +debug = 1 + +[broadcast] +exec = /opt/www/bin/broadcast +param = /opt/www/tmp/channels/antd_hotline.sock +param = broadcast +debug = 1 + ;service=/opt/www/bin/wfifo /opt/www/tmp/channels/antd_hotline.sock server_to_client /var/wfifo_s2c r ;service=/opt/www/bin/wfifo /opt/www/tmp/channels/antd_hotline.sock client_to_server /var/wfifo_c2s w \ No newline at end of file diff --git a/tunnel.c b/tunnel.c index 5f597f2..2a6506d 100644 --- a/tunnel.c +++ b/tunnel.c @@ -67,6 +67,7 @@ static int msg_check_number(int fd, uint16_t number) M_ERROR(MODULE_NAME, "Unable to read integer value: %s", strerror(errno)); return -1; } + value = ntohs(value); if(number != value) { M_ERROR(MODULE_NAME, "Value mismatches: %04X, expected %04X", value, number); @@ -103,6 +104,7 @@ static uint8_t* msg_read_payload(int fd, uint32_t* size) M_ERROR(MODULE_NAME, "Unable to read payload data size: %s", strerror(errno)); return NULL; } + *size = ntohl(*size); if(*size <= 0) { return NULL; @@ -168,11 +170,13 @@ int msg_read(int fd, tunnel_msg_t* msg) M_ERROR(MODULE_NAME, "Unable to read msg channel id"); return -1; } + msg->header.channel_id = ntohs(msg->header.channel_id); if(guard_read(fd, &msg->header.client_id, sizeof(msg->header.client_id)) == -1) { M_ERROR(MODULE_NAME, "Unable to read msg client id"); return -1; } + msg->header.client_id = ntohs(msg->header.client_id); if((msg->data = msg_read_payload(fd, &msg->header.size)) == NULL && msg->header.size != 0) { M_ERROR(MODULE_NAME, "Unable to read msg payload data"); @@ -193,8 +197,10 @@ int msg_read(int fd, tunnel_msg_t* msg) int msg_write(int fd, tunnel_msg_t* msg) { // write begin magic number - uint16_t number = MSG_MAGIC_BEGIN; - if(guard_write(fd,&number, sizeof(number)) == -1) + uint16_t net16; + uint32_t net32; + net16 = htons(MSG_MAGIC_BEGIN); + if(guard_write(fd,&net16, sizeof(net16)) == -1) { M_ERROR(MODULE_NAME, "Unable to write begin magic number: %s", strerror(errno)); return -1; @@ -206,20 +212,22 @@ int msg_write(int fd, tunnel_msg_t* msg) return -1; } // write channel id - if(guard_write(fd,&msg->header.channel_id, sizeof(msg->header.channel_id)) == -1) + net16 = htons(msg->header.channel_id); + if(guard_write(fd,&net16, sizeof(msg->header.channel_id)) == -1) { M_ERROR(MODULE_NAME, "Unable to write msg channel id: %s", strerror(errno)); return -1; } //write client id - if(guard_write(fd,&msg->header.client_id, sizeof(msg->header.client_id)) == -1) + net16 = htons(msg->header.client_id); + if(guard_write(fd,&net16, sizeof(msg->header.client_id)) == -1) { M_ERROR(MODULE_NAME, "Unable to write msg client id: %s", strerror(errno)); return -1; } // write payload len - - if(guard_write(fd,&msg->header.size, sizeof(msg->header.size)) == -1) + net32 = htonl(msg->header.size); + if(guard_write(fd,&net32, sizeof(msg->header.size)) == -1) { M_ERROR(MODULE_NAME, "Unable to write msg payload length: %s", strerror(errno)); return -1; @@ -233,8 +241,8 @@ int msg_write(int fd, tunnel_msg_t* msg) return -1; } } - number = MSG_MAGIC_END; - if(guard_write(fd,&number, sizeof(number)) == -1) + net16 = htons(MSG_MAGIC_END); + if(guard_write(fd,&net16, sizeof(net16)) == -1) { M_ERROR(MODULE_NAME, "Unable to write end magic number: %s", strerror(errno)); return -1; diff --git a/tunnel.h b/tunnel.h index 377ceae..416ae86 100644 --- a/tunnel.h +++ b/tunnel.h @@ -1,6 +1,7 @@ #ifndef TUNNEL_H #define TUNNEL_H #include +#include #include "log.h" #define MAX_CHANNEL_PATH 108 diff --git a/v4l2cam/v4l2cam.c b/v4l2cam/v4l2cam.c index 753b847..eda3f0d 100644 --- a/v4l2cam/v4l2cam.c +++ b/v4l2cam/v4l2cam.c @@ -423,6 +423,7 @@ int main(const int argc, const char **argv) int status; fd_set fd_in; uint64_t expirations_count; + uint16_t net16; void *fargv[2]; unsigned int offset = 0; if (argc != 4) @@ -553,8 +554,10 @@ int main(const int argc, const char **argv) msg.header.type = CHANNEL_CTRL; msg.header.size = 6; msg.data = (uint8_t *)buff; - (void)memcpy(buff, &video_setting.width, sizeof(video_setting.width)); - (void)memcpy(buff + sizeof(video_setting.width), &video_setting.height, sizeof(video_setting.height)); + net16 = htons(video_setting.width); + (void)memcpy(buff, &net16, sizeof(video_setting.width)); + net16 = htons(video_setting.height); + (void)memcpy(buff + sizeof(video_setting.height), &net16, sizeof(video_setting.height)); buff[sizeof(video_setting.width) + sizeof(video_setting.height)] = video_setting.fps; buff[sizeof(video_setting.width) + sizeof(video_setting.height) + 1] = video_setting.jpeg_quality; if (msg_write(sock, &msg) == -1) @@ -574,8 +577,10 @@ int main(const int argc, const char **argv) { offset = 0; (void)memcpy(&video_setting.width, msg.data, 2); + video_setting.width = ntohs(video_setting.width); offset += 2; (void)memcpy(&video_setting.height, msg.data + offset, 2); + video_setting.height = ntohs(video_setting.height); offset += 2; (void)memcpy(&video_setting.fps, msg.data + offset, 1); offset++; @@ -604,8 +609,10 @@ int main(const int argc, const char **argv) msg.header.type = CHANNEL_CTRL; msg.header.size = 6; msg.data = (uint8_t *)buff; - (void)memcpy(buff, &video_setting.width, sizeof(video_setting.width)); - (void)memcpy(buff + sizeof(video_setting.width), &video_setting.height, sizeof(video_setting.height)); + net16 = htons(video_setting.width); + (void)memcpy(buff, &net16, sizeof(video_setting.width)); + net16 = htons(video_setting.height); + (void)memcpy(buff + sizeof(video_setting.height), &net16, sizeof(video_setting.height)); buff[sizeof(video_setting.width) + sizeof(video_setting.height)] = video_setting.fps; buff[sizeof(video_setting.width) + sizeof(video_setting.height) + 1] = video_setting.jpeg_quality; fargv[0] = (void *)&msg; diff --git a/vterm/vterm.c b/vterm/vterm.c index 982889a..acda558 100644 --- a/vterm/vterm.c +++ b/vterm/vterm.c @@ -17,37 +17,39 @@ #include #include "../tunnel.h" -#define MODULE_NAME "vterm" +#define MODULE_NAME "vterm" -typedef struct{ - int fdm; - pid_t pid; - int cid; +typedef struct +{ + int fdm; + pid_t pid; + int cid; } vterm_proc_t; -static bst_node_t* processes = NULL; +static bst_node_t *processes = NULL; static volatile int running = 1; -static void int_handler(int dummy) { - (void) dummy; +static void int_handler(int dummy) +{ + (void)dummy; running = 0; } -static vterm_proc_t* terminal_new(const char* user) +static vterm_proc_t *terminal_new(const char *user) { int fdm, fds, rc; pid_t pid; - vterm_proc_t* proc = NULL; + vterm_proc_t *proc = NULL; char cmd[64]; (void)memset(cmd, 0, sizeof(cmd)); - if(user && strlen(user) > 0) + if (user && strlen(user) > 0) { - snprintf(cmd, sizeof(cmd),"TERM=linux su -l %s", user); + snprintf(cmd, sizeof(cmd), "TERM=linux su -l %s", user); } else { - snprintf(cmd, sizeof(cmd),"TERM=linux login"); + snprintf(cmd, sizeof(cmd), "TERM=linux login"); } // Check arguments fdm = posix_openpt(O_RDWR); @@ -56,7 +58,7 @@ static vterm_proc_t* terminal_new(const char* user) M_LOG(MODULE_NAME, "Error on posix_openpt(): %s\n", strerror(errno)); return NULL; } - + rc = grantpt(fdm); if (rc != 0) { @@ -69,16 +71,16 @@ static vterm_proc_t* terminal_new(const char* user) M_LOG(MODULE_NAME, "Error on unlockpt(): %s\n", strerror(errno)); return NULL; } - + // Open the slave side ot the PTY fds = open(ptsname(fdm), O_RDWR); - + // Create the child process pid = fork(); if (pid) { // parent - proc = (vterm_proc_t*)malloc(sizeof(vterm_proc_t)); + proc = (vterm_proc_t *)malloc(sizeof(vterm_proc_t)); proc->fdm = fdm; proc->pid = pid; return proc; @@ -87,40 +89,40 @@ static vterm_proc_t* terminal_new(const char* user) { //struct termios slave_orig_term_settings; // Saved terminal settings //struct termios new_term_settings; // Current terminal settings - + // CHILD - + // Close the master side of the PTY close(fdm); - + // Save the defaults parameters of the slave side of the PTY //rc = tcgetattr(fds, &slave_orig_term_settings); - + // Set RAW mode on slave side of PTY //new_term_settings = slave_orig_term_settings; //cfmakeraw (&new_term_settings); //tcsetattr (fds, TCSANOW, &new_term_settings); - + // The slave side of the PTY becomes the standard input and outputs of the child process // we use cook mode here close(0); // Close standard input (current terminal) close(1); // Close standard output (current terminal) close(2); // Close standard error (current terminal) - + rc = dup(fds); // PTY becomes standard input (0) rc = dup(fds); // PTY becomes standard output (1) rc = dup(fds); // PTY becomes standard error (2) - + // Now the original file descriptor is useless close(fds); - + // Make the current process a new session leader setsid(); - + // As the child is a session leader, set the controlling terminal to be the slave side of the PTY // (Mandatory for programs like the shell to make them manage correctly their outputs) ioctl(0, TIOCSCTTY, 1); - + //system("/bin/bash"); rc = system(cmd); //M_LOG("%s\n","Terminal exit"); @@ -131,16 +133,16 @@ static vterm_proc_t* terminal_new(const char* user) static void terminal_kill(int client_id, int should_delete) { // find the proc - bst_node_t* node = bst_find(processes, client_id); - vterm_proc_t* proc; - if(node != NULL) + bst_node_t *node = bst_find(processes, client_id); + vterm_proc_t *proc; + if (node != NULL) { - proc = (vterm_proc_t*)node->data; - if(proc != NULL) + proc = (vterm_proc_t *)node->data; + if (proc != NULL) { - (void) close(proc->fdm); + (void)close(proc->fdm); M_LOG(MODULE_NAME, "Kill the process %d", proc->pid); - if(kill(proc->pid, SIGKILL) == - 1) + if (kill(proc->pid, SIGKILL) == -1) { M_ERROR(MODULE_NAME, "Unable to kill process %d: %s", proc->pid, strerror(errno)); } @@ -149,25 +151,24 @@ static void terminal_kill(int client_id, int should_delete) (void)waitpid(proc->pid, NULL, 0); } free(node->data); - if(should_delete) + if (should_delete) processes = bst_delete(processes, node->key); // wait child - } } } -static int terminal_write(tunnel_msg_t* msg) +static int terminal_write(tunnel_msg_t *msg) { // TODO: control frame e.g. for window resize - bst_node_t* node = bst_find(processes, msg->header.client_id); - vterm_proc_t* proc; - if(node != NULL) + bst_node_t *node = bst_find(processes, msg->header.client_id); + vterm_proc_t *proc; + if (node != NULL) { - proc = (vterm_proc_t*)node->data; - if(proc != NULL) + proc = (vterm_proc_t *)node->data; + if (proc != NULL) { - if(write(proc->fdm, msg->data, msg->header.size) == -1) + if (write(proc->fdm, msg->data, msg->header.size) == -1) { M_ERROR(MODULE_NAME, "Unable to write data to the terminal corresponding to client %d", msg->header.client_id); return -1; @@ -187,71 +188,71 @@ static int terminal_write(tunnel_msg_t* msg) return 0; } -static void unsubscribe(bst_node_t* node, void** args, int argc) +static void unsubscribe(bst_node_t *node, void **args, int argc) { - (void) argc; + (void)argc; tunnel_msg_t msg; - int* ufd = (int*) args[0]; - vterm_proc_t* proc = (vterm_proc_t*) node->data; - if(proc != NULL) + int *ufd = (int *)args[0]; + vterm_proc_t *proc = (vterm_proc_t *)node->data; + if (proc != NULL) { msg.header.type = CHANNEL_UNSUBSCRIBE; - msg.header.client_id = proc->cid; - msg.header.size = 0; - terminal_kill(proc->cid, 0); - if(msg_write(*ufd, &msg) == -1) - { - M_ERROR(MODULE_NAME, "Unable to request unsubscribe to client %d", proc->cid); - } + msg.header.client_id = proc->cid; + msg.header.size = 0; + terminal_kill(proc->cid, 0); + if (msg_write(*ufd, &msg) == -1) + { + M_ERROR(MODULE_NAME, "Unable to request unsubscribe to client %d", proc->cid); + } } } -static void set_sock_fd(bst_node_t* node, void** args, int argc) +static void set_sock_fd(bst_node_t *node, void **args, int argc) { - (void) argc; + (void)argc; tunnel_msg_t msg; pid_t wpid; - fd_set* fd_in = (fd_set*) args[1]; - int* max_fd = (int*)args[2]; - list_t* list_p = (list_t*) args[3]; - int* ufd = (int*) args[0]; - - vterm_proc_t* proc = (vterm_proc_t*) node->data; - - if(proc != NULL) + fd_set *fd_in = (fd_set *)args[1]; + int *max_fd = (int *)args[2]; + list_t *list_p = (list_t *)args[3]; + int *ufd = (int *)args[0]; + + vterm_proc_t *proc = (vterm_proc_t *)node->data; + + if (proc != NULL) { // monitor the pid wpid = waitpid(proc->pid, NULL, WNOHANG); - if(wpid == -1 || wpid > 0) - { - // child exits - M_LOG(MODULE_NAME, "Terminal linked to client %d exits\n", proc->cid); - unsubscribe(node, args, argc); - list_put_ptr(list_p, node); - } - else - { - FD_SET(proc->fdm, fd_in); - if(*max_fd < proc->fdm) + if (wpid == -1 || wpid > 0) + { + // child exits + M_LOG(MODULE_NAME, "Terminal linked to client %d exits\n", proc->cid); + unsubscribe(node, args, argc); + list_put_ptr(list_p, node); + } + else + { + FD_SET(proc->fdm, fd_in); + if (*max_fd < proc->fdm) { *max_fd = proc->fdm; } - } + } } } -static void terminal_monitor(bst_node_t* node, void** args, int argc) +static void terminal_monitor(bst_node_t *node, void **args, int argc) { - (void) argc; - int* ufd = (int*) args[0]; - fd_set* fd_in = (fd_set*) args[1]; - list_t* list = (list_t*) args[3]; + (void)argc; + int *ufd = (int *)args[0]; + fd_set *fd_in = (fd_set *)args[1]; + list_t *list = (list_t *)args[3]; char buff[BUFFLEN]; tunnel_msg_t msg; int rc; - vterm_proc_t* proc = (vterm_proc_t*) node->data; - - if(proc != NULL && FD_ISSET(proc->fdm, fd_in)) + vterm_proc_t *proc = (vterm_proc_t *)node->data; + + if (proc != NULL && FD_ISSET(proc->fdm, fd_in)) { if ((rc = read(proc->fdm, buff, BUFFLEN)) > 0) { @@ -260,10 +261,10 @@ static void terminal_monitor(bst_node_t* node, void** args, int argc) msg.header.type = CHANNEL_DATA; msg.header.size = rc; msg.data = buff; - if(msg_write(*ufd, &msg) == -1) + if (msg_write(*ufd, &msg) == -1) { terminal_kill(node->key, 0); - M_ERROR(MODULE_NAME,"Unable to send data to client %d", msg.header.client_id); + M_ERROR(MODULE_NAME, "Unable to send data to client %d", msg.header.client_id); list_put_ptr(list, node); } } @@ -271,9 +272,9 @@ static void terminal_monitor(bst_node_t* node, void** args, int argc) { if (rc < 0) { - M_LOG(MODULE_NAME, "Error on read standard input: %s\n", strerror(errno)); - terminal_kill(node->key, 0); - list_put_ptr(list, node); + M_LOG(MODULE_NAME, "Error on read standard input: %s\n", strerror(errno)); + terminal_kill(node->key, 0); + list_put_ptr(list, node); } } } @@ -282,11 +283,11 @@ static void terminal_monitor(bst_node_t* node, void** args, int argc) static void terminal_resize(int cid, int col, int row) { struct winsize win = {0, 0, 0, 0}; - bst_node_t* node = bst_find(processes, cid); - vterm_proc_t* proc; - if(node != NULL) + bst_node_t *node = bst_find(processes, cid); + vterm_proc_t *proc; + if (node != NULL) { - proc = (vterm_proc_t*) node->data; + proc = (vterm_proc_t *)node->data; if (ioctl(proc->fdm, TIOCGWINSZ, &win) != 0) { if (errno != EINVAL) @@ -304,39 +305,39 @@ static void terminal_resize(int cid, int col, int row) if (ioctl(proc->fdm, TIOCSWINSZ, (char *)&win) != 0) M_ERROR(MODULE_NAME, "Unable to set terminal window size process linked to client %d: %s", cid, strerror(errno)); - } + } else { M_ERROR(MODULE_NAME, "Unable to find the terminal process linked to client %d", cid); } } -int main(int argc, char** argv) +int main(int argc, char **argv) { int fd; tunnel_msg_t msg; fd_set fd_in; int status, maxfd; struct timeval timeout; - char buff[MAX_CHANNEL_NAME+1]; + char buff[MAX_CHANNEL_NAME + 1]; void *args[4]; list_t list; item_t item; int ncol, nrow; - + LOG_INIT(MODULE_NAME); - if(argc != 2) + if (argc != 2) { printf("Usage: %s path/to/hotline/socket\n", argv[0]); return -1; } signal(SIGPIPE, SIG_IGN); - signal(SIGABRT, SIG_IGN); + signal(SIGABRT, SIG_IGN); signal(SIGINT, int_handler); M_LOG(MODULE_NAME, "Hotline is: %s", argv[1]); // now try to request new channel from hotline fd = open_unix_socket(argv[1]); - if(fd == -1) + if (fd == -1) { M_ERROR(MODULE_NAME, "Unable to open the hotline: %s", argv[1]); return -1; @@ -345,168 +346,172 @@ int main(int argc, char** argv) msg.header.channel_id = 0; msg.header.client_id = 0; M_LOG(MODULE_NAME, "Request to open the channel %s", MODULE_NAME); - (void)strncpy(buff, MODULE_NAME,MAX_CHANNEL_NAME); + (void)strncpy(buff, MODULE_NAME, MAX_CHANNEL_NAME); msg.header.size = strlen(buff); - msg.data = (uint8_t*) buff; - if(msg_write(fd, &msg) == -1) + msg.data = (uint8_t *)buff; + if (msg_write(fd, &msg) == -1) { M_ERROR(MODULE_NAME, "Unable to write message to hotline"); - (void) close(fd); + (void)close(fd); return -1; } M_LOG(MODULE_NAME, "Wait for comfirm creation of %s", MODULE_NAME); // now wait for message - if(msg_read(fd, &msg) == -1) + if (msg_read(fd, &msg) == -1) { M_ERROR(MODULE_NAME, "Unable to read message from hotline"); - (void) close(fd); + (void)close(fd); return -1; } - if(msg.header.type == CHANNEL_OK) + if (msg.header.type == CHANNEL_OK) { M_LOG(MODULE_NAME, "Channel created: %s", MODULE_NAME); - if(msg.data) + if (msg.data) free(msg.data); } else { M_ERROR(MODULE_NAME, "Channel is not created: %s. Tunnel service responds with msg of type %d", MODULE_NAME, msg.header.type); - if(msg.data) + if (msg.data) free(msg.data); running = 0; } // now read data - while(running) + while (running) { FD_ZERO(&fd_in); FD_SET(fd, &fd_in); maxfd = fd; - + // monitor processes list = list_init(); - args[1] = (void*) &fd_in; - args[2] = (void*) &maxfd; - args[3] = (void*) &list; - args[0] = (void*) &fd; + args[1] = (void *)&fd_in; + args[2] = (void *)&maxfd; + args[3] = (void *)&list; + args[0] = (void *)&fd; bst_for_each(processes, set_sock_fd, args, 4); list_for_each(item, list) { - processes = bst_delete(processes, ((bst_node_t*)(item->value.ptr))->key); + processes = bst_delete(processes, ((bst_node_t *)(item->value.ptr))->key); item->value.ptr = NULL; } list_free(&list); - + status = select(maxfd + 1, &fd_in, NULL, NULL, NULL); - + switch (status) - { - case -1: - M_LOG(MODULE_NAME, "Error %d on select()\n", errno); - running = 0; - break; - case 0: - break; - // we have data - default: - if (FD_ISSET(fd, &fd_in)) + { + case -1: + M_LOG(MODULE_NAME, "Error %d on select()\n", errno); + running = 0; + break; + case 0: + break; + // we have data + default: + if (FD_ISSET(fd, &fd_in)) + { + if (msg_read(fd, &msg) == -1) { - if(msg_read(fd, &msg) == -1) - { - M_ERROR(MODULE_NAME, "Unable to read message from channel. quit"); - running = 0; - } - else - { - switch (msg.header.type) - { - case CHANNEL_SUBSCRIBE: - M_LOG(MODULE_NAME, "Client %d subscribes to the chanel with user [%s]", msg.header.client_id, msg.data); - // create new process - vterm_proc_t* proc = terminal_new(msg.data); - if(proc == NULL) - { - M_ERROR(MODULE_NAME, "Unable to create new terminal for client %d", msg.header.client_id); - // unsubscribe client - msg.header.type = CHANNEL_UNSUBSCRIBE; - msg.header.size = 0; - if(msg_write(fd, &msg) == -1) - { - M_LOG(MODULE_NAME,"Unable to request unsubscribe client %d", msg.header.client_id); - } - } - else - { - proc->cid = msg.header.client_id; - // insert new terminal to the list - processes = bst_insert(processes, msg.header.client_id, proc); - } - break; - - case CHANNEL_UNSUBSCRIBE: - M_LOG(MODULE_NAME, "Client %d unsubscribes to the chanel", msg.header.client_id); - terminal_kill(msg.header.client_id, 1); - break; - - case CHANNEL_CTRL: - if(msg.header.size == 8) - { - (void)memcpy(&ncol, msg.data, sizeof(ncol)); - (void)memcpy(&nrow, msg.data + sizeof(ncol), sizeof(nrow)); - M_LOG(MODULE_NAME, "Client %d request terminal window resize of (%d,%d)", msg.header.client_id, ncol, nrow); - terminal_resize(msg.header.client_id, ncol, nrow); - - } - else - { - M_ERROR(MODULE_NAME, "Invalid control message size: %d from client %d, expected 8", msg.header.size, msg.header.client_id); - } - - break; - - case CHANNEL_DATA: - if(terminal_write(&msg) == -1) - { - M_ERROR(MODULE_NAME, "Unable to write data to terminal corresponding to client %d", msg.header.client_id); - terminal_kill(msg.header.client_id, 1); - msg.header.type = CHANNEL_UNSUBSCRIBE; - msg.header.size = 0; - if(msg_write(fd, &msg) == -1) - { - M_LOG(MODULE_NAME,"Unable to request unsubscribe client %d", msg.header.client_id); - } - } - break; - - default: - M_LOG(MODULE_NAME, "Client %d send message of type %d", - msg.header.client_id, msg.header.type); - break; - } - if(msg.data) - { - free(msg.data); - } - } + M_ERROR(MODULE_NAME, "Unable to read message from channel. quit"); + running = 0; } else { - // on the processes side - list = list_init(); - bst_for_each(processes, terminal_monitor, args, 4); - list_for_each(item, list) + switch (msg.header.type) { - processes = bst_delete(processes, ((bst_node_t*)(item->value.ptr))->key); - item->value.ptr = NULL; + case CHANNEL_SUBSCRIBE: + M_LOG(MODULE_NAME, "Client %d subscribes to the chanel with user [%s]", msg.header.client_id, msg.data); + // create new process + vterm_proc_t *proc = terminal_new(msg.data); + if (proc == NULL) + { + M_ERROR(MODULE_NAME, "Unable to create new terminal for client %d", msg.header.client_id); + // unsubscribe client + msg.header.type = CHANNEL_UNSUBSCRIBE; + msg.header.size = 0; + if (msg_write(fd, &msg) == -1) + { + M_LOG(MODULE_NAME, "Unable to request unsubscribe client %d", msg.header.client_id); + } + } + else + { + proc->cid = msg.header.client_id; + // insert new terminal to the list + processes = bst_insert(processes, msg.header.client_id, proc); + } + break; + + case CHANNEL_UNSUBSCRIBE: + M_LOG(MODULE_NAME, "Client %d unsubscribes to the chanel", msg.header.client_id); + terminal_kill(msg.header.client_id, 1); + break; + + case CHANNEL_CTRL: + if (msg.header.size == 8) + { + (void)memcpy(&ncol, msg.data, sizeof(ncol)); + (void)memcpy(&nrow, msg.data + sizeof(ncol), sizeof(nrow)); + ncol = ntohl(ncol); + nrow = ntohl(nrow); + M_LOG(MODULE_NAME, "Client %d request terminal window resize of (%d,%d)", msg.header.client_id, ncol, nrow); + terminal_resize(msg.header.client_id, ncol, nrow); + } + else + { + M_ERROR(MODULE_NAME, "Invalid control message size: %d from client %d, expected 8", msg.header.size, msg.header.client_id); + } + + break; + + case CHANNEL_DATA: + if (terminal_write(&msg) == -1) + { + M_ERROR(MODULE_NAME, "Unable to write data to terminal corresponding to client %d", msg.header.client_id); + terminal_kill(msg.header.client_id, 1); + msg.header.type = CHANNEL_UNSUBSCRIBE; + msg.header.size = 0; + if (msg_write(fd, &msg) == -1) + { + M_LOG(MODULE_NAME, "Unable to request unsubscribe client %d", msg.header.client_id); + } + } + break; + + default: + M_LOG(MODULE_NAME, "Client %d send message of type %d", + msg.header.client_id, msg.header.type); + break; + } + if (msg.data) + { + free(msg.data); } - list_free(&list); } - + } + else + { + // on the processes side + list = list_init(); + bst_for_each(processes, terminal_monitor, args, 4); + list_for_each(item, list) + { + processes = bst_delete(processes, ((bst_node_t *)(item->value.ptr))->key); + if (((bst_node_t *)(item->value.ptr))->data) + { + free(((bst_node_t *)(item->value.ptr))->data); + } + item->value.ptr = NULL; + } + list_free(&list); + } } } // unsubscribe all clients - args[0] = (void*) &fd; + args[0] = (void *)&fd; bst_for_each(processes, unsubscribe, args, 1); (void)bst_free(processes); // close the channel @@ -514,13 +519,13 @@ int main(int argc, char** argv) msg.header.type = CHANNEL_CLOSE; msg.header.size = 0; msg.data = NULL; - if( msg_write(fd, &msg) == -1) + if (msg_write(fd, &msg) == -1) { M_ERROR(MODULE_NAME, "Unable to request channel close"); } // close all opened terminal - + (void)msg_read(fd, &msg); - (void) close(fd); + (void)close(fd); return 0; } \ No newline at end of file