From 29c3cffbc5f15dc688c289c151fecf08784038b6 Mon Sep 17 00:00:00 2001 From: lxsang Date: Mon, 3 Aug 2020 19:49:55 +0200 Subject: [PATCH] working with application via the shared socket --- Makefile.am | 4 +- bst.c | 16 ++-- tunnel.c | 206 ++++++++++++++++++++++++++++------------------------ 3 files changed, 121 insertions(+), 105 deletions(-) diff --git a/Makefile.am b/Makefile.am index 01ba2df..2d1e7ca 100644 --- a/Makefile.am +++ b/Makefile.am @@ -13,7 +13,7 @@ endif AM_CPPFLAGS += -W -Wall -g -std=c99 -fPIC lib_LTLIBRARIES = tunnel.la -wterm_la_LDFLAGS = -module -avoid-version -shared -wterm_la_SOURCES = tunnel.c bst.c +tunnel_la_LDFLAGS = -module -avoid-version -shared +tunnel_la_SOURCES = tunnel.c bst.c EXTRA_DIST = README.md \ No newline at end of file diff --git a/bst.c b/bst.c index 429836e..c6d0b63 100644 --- a/bst.c +++ b/bst.c @@ -23,9 +23,11 @@ bst_node_t* bst_insert(bst_node_t* root, int key, void* data) root->left = root->right = NULL; } else if(key < root->key) - root->left = insert(root->left, key, data); + root->left = bst_insert(root->left, key, data); else if(key > root->key) - root->right = insert(root->right, key, data); + root->right = bst_insert(root->right, key, data); + else + root->data = data; return root; } @@ -54,9 +56,9 @@ bst_node_t* bst_find(bst_node_t* root, int x) if(root == NULL) return NULL; else if(x < root->key) - return find(root->left, x); + return bst_find(root->left, x); else if(x > root->key) - return find(root->right, x); + return bst_find(root->right, x); else return root; } @@ -68,15 +70,15 @@ bst_node_t* bst_delete(bst_node_t* root, int x) if(root == NULL) return NULL; else if(x < root->key) - root->left = delete(root->left, x); + root->left = bst_delete(root->left, x); else if(x > root->key) - root->right = delete(root->right, x); + root->right = bst_delete(root->right, x); else if(root->left && root->right) { temp = bst_find_min(root->right); root->key = temp->key; root->data = temp->data; - root->right = delete(root->right, root->key); + root->right = bst_delete(root->right, root->key); } else { diff --git a/tunnel.c b/tunnel.c index c20b9af..0eacc9a 100644 --- a/tunnel.c +++ b/tunnel.c @@ -10,6 +10,7 @@ #include #include #include +#include #include "bst.h" #define MAX_CHANNEL_PATH 108 @@ -30,7 +31,6 @@ //#define CHANNEL_LIST (uint8_t)0x7 typedef struct { int sock; - char sun_path[MAX_CHANNEL_PATH]; char name[MAX_CHANNEL_NAME]; bst_node_t* subscribers; } antd_tunnel_channel_t; @@ -44,7 +44,7 @@ typedef struct { typedef struct{ antd_tunnel_msg_h_t header; - uint8_t data; + uint8_t* data; } antd_tunnel_msg_t; /** * Message is sent in the following format @@ -57,6 +57,7 @@ typedef struct { pthread_t tid; int hotline; uint32_t id_allocator; + uint8_t initialized; } antd_tunnel_t; static antd_tunnel_t g_tunnel; @@ -72,7 +73,7 @@ static int mk_socket(const char* name, char* path) if(!_exist(path)) { LOG("Socket dir does not exist, create it: %s",path); - if(mkdir(path, 0700) == -1) + if(mkdir(path, 0755) == -1) { ERROR("Unable to create socket dir: %s =", strerror(errno)); return -1; @@ -85,7 +86,7 @@ static int mk_socket(const char* name, char* path) ERROR("Socket file path exceeds the maximal size of: %d", MAX_CHANNEL_PATH); return -1; } - (void)strcat(path, HOT_LINE_SOCKET); + (void)strcat(path, name); (void) strncpy(address.sun_path, path, sizeof(address.sun_path)); @@ -106,7 +107,7 @@ static int mk_socket(const char* name, char* path) ERROR("Unable to listen to socket: %d (%s): %s",fd, path , strerror(errno)); return -1; } - LOG("Socket %s is created successfully", path); + LOG("Socket %s is created successfully: %d", path, fd); return fd; } @@ -115,7 +116,7 @@ static int msg_check_number(int fd, int number) int value; if(read(fd,&value,sizeof(value)) == -1) { - ERROR("Unable to read integer value: %s", strerror(errno)); + ERROR("Unable to read integer value on socket %d: %s", fd, strerror(errno)); return -1; } if(number != value) @@ -146,20 +147,20 @@ static int msg_read_string(int fd, char* buffer, uint8_t max_length) return 0; } -static char* msg_read_payload(int fd, int* size) +static uint8_t* msg_read_payload(int fd, int* size) { - char* data; + uint8_t* data; if(read(fd,size,sizeof(*size)) == -1) { ERROR("Unable to read payload data size: %s", strerror(errno)); return NULL; } - if(size <= 0) + if(*size <= 0) { return NULL; } - data = (char*) malloc(*size); + data = (uint8_t*) malloc(*size); if(data == NULL) { ERROR("Unable to allocate memory for payload data: %s", strerror(errno)); @@ -178,7 +179,7 @@ static int msg_read(int fd, antd_tunnel_msg_t* msg) { if(msg_check_number(fd, MSG_MAGIC_BEGIN) == -1) { - ERROR("Unable to check begin magic number"); + ERROR("Unable to check begin magic number on socket: %d", fd); return -1; } if(read(fd,&msg->header.type,sizeof(msg->header.type)) == -1) @@ -201,7 +202,7 @@ static int msg_read(int fd, antd_tunnel_msg_t* msg) ERROR("Unable to read msg client id"); return -1; } - if((msg->data = msg_read_payload(fd, &msg->header.size)) == NULL) + if((msg->data = msg_read_payload(fd, &msg->header.size)) == NULL && msg->header.size != 0) { ERROR("Unable to read msg payload data"); return -1; @@ -270,13 +271,16 @@ static int msg_write(int fd, antd_tunnel_msg_t* msg) } static void destroy_channel(antd_tunnel_channel_t* channel) { + if(channel == NULL) + return; if(channel->sock != -1) { (void) close(channel->sock); - (void)unlink(channel->sun_path); + channel->sock = -1; } /** TODO: send message to all subcribers before close*/ bst_free(channel->subscribers); + free(channel); } static void channel_open(int fd, const char* name) { @@ -285,7 +289,7 @@ static void channel_open(int fd, const char* name) bst_node_t* node; int hash_val = simple_hash(name); antd_tunnel_msg_t msg; - msg.data = buffer; + msg.data = (uint8_t*)buffer; msg.header.channel_id = 0; msg.header.client_id = 0; // look if the channel is already opened @@ -296,19 +300,24 @@ static void channel_open(int fd, const char* name) pthread_mutex_unlock(&g_tunnel.lock); if(node != NULL) { - snprintf(buffer, BUFFLEN, "Cannot open new channel: channel %s exists", name); - LOG("%s", buffer); - msg.header.type = CHANNEL_ERROR; - msg.header.size = strlen(buffer); - if(msg_write(fd, &msg) == -1) + channel = (antd_tunnel_channel_t*) node->data; + if(channel != NULL && channel->sock != -1) { - ERROR("Unable to write message to hotline"); + snprintf(buffer, BUFFLEN, "Cannot open new channel: channel %s exists", name); + LOG("%s", buffer); + msg.header.type = CHANNEL_ERROR; + msg.header.size = strlen(buffer); + if(msg_write(fd, &msg) == -1) + { + ERROR("Unable to write message to channel %s (%d)", channel->name, channel->sock); + } + return; } - return; } } // create new channel channel = (antd_tunnel_channel_t*)malloc(sizeof(antd_tunnel_channel_t)); + channel->subscribers = NULL; if(channel == NULL) { snprintf(buffer, BUFFLEN, "Unable to allocate new memory for new channel"); @@ -323,28 +332,15 @@ static void channel_open(int fd, const char* name) } // create socket file (void)strncpy(channel->name, name, MAX_CHANNEL_NAME); - channel->sock = mk_socket(name, channel->sun_path); - if(channel->sock == -1) - { - snprintf(buffer, BUFFLEN, "Unable to create socket"); - msg.header.type = CHANNEL_ERROR; - msg.header.size = strlen(buffer); - free(channel); - if(msg_write(fd, &msg) == -1) - { - ERROR("Unable to write message to hotline"); - } - return; - } + channel->sock = fd; // response with ok message msg.header.type = CHANNEL_OK; msg.header.channel_id = hash_val; - msg.header.size = strlen(channel->sun_path); - msg.data = channel->sun_path; + msg.header.size = 0; if(msg_write(fd, &msg) == -1) { destroy_channel(channel); - ERROR("Unable to write message to hotline"); + ERROR("Unable to write message to hotline (%d)", fd); } // channel created pthread_mutex_lock(&g_tunnel.lock); @@ -352,49 +348,51 @@ static void channel_open(int fd, const char* name) pthread_mutex_unlock(&g_tunnel.lock); } -static void channel_close(int fd, const char* name) +static void channel_close(antd_tunnel_channel_t* channel) { - char buffer[BUFFLEN]; antd_tunnel_msg_t msg; - bst_node_t* node; - int hash_val = simple_hash(name); - msg.data = buffer; + msg.data = NULL; msg.header.channel_id = 0; msg.header.client_id = 0; // look for the channel if(g_tunnel.channels != NULL) { - pthread_mutex_lock(&g_tunnel.lock); - node = bst_find(g_tunnel.channels, hash_val); - if(node) + msg.header.channel_id = msg.header.channel_id; + if(channel != NULL) { - destroy_channel((antd_tunnel_channel_t*)node->data); + msg.header.type = CHANNEL_OK; + msg.header.size = 0; + if(msg_write(channel->sock, &msg) == -1) + { + ERROR("Unable to write message to channel %s (%d)", channel->name, channel->sock); + } + LOG("Close channel: %s (%d)", channel->name, channel->sock); + destroy_channel(channel); } - msg.header.channel_id = node->key; - g_tunnel.channels = bst_delete(g_tunnel.channels, node->key); - pthread_mutex_unlock(&g_tunnel.lock); - } - msg.header.type = CHANNEL_OK; - msg.header.size = 0; - if(msg_write(g_tunnel.hotline, &msg) == -1) - { - ERROR("Unable to write message to hotline"); + //g_tunnel.channels = bst_delete(g_tunnel.channels, node->key); } } -static void monitor_hotline(int fd) +static void monitor_hotline(int listen_fd) { char buff[MAX_CHANNEL_NAME+1]; antd_tunnel_msg_t msg; + int fd; + fd = accept(listen_fd, NULL, NULL); + if (fd < 0) + { + ERROR("Unable to accept the new connection: %s", strerror(errno)); + return; + } if(msg_read(fd, &msg) == -1) { ERROR("Unable to read message from hotline"); + (void) close(fd); return; } switch (msg.header.type) { case CHANNEL_OPEN: - case CHANNEL_CLOSE: // get channel name if(msg.header.size > MAX_CHANNEL_NAME) { @@ -404,7 +402,7 @@ static void monitor_hotline(int fd) (void) snprintf(buff, MAX_CHANNEL_NAME, "Channel name exceeds %d bytes", MAX_CHANNEL_NAME); LOG("%s", buff); msg.header.size = strlen(buff); - msg.data = buff; + msg.data = (uint8_t*)buff; if(msg_write(fd, &msg) == -1) { ERROR("Unable to write error to hotline"); @@ -414,14 +412,8 @@ static void monitor_hotline(int fd) { (void)memcpy(buff, msg.data, msg.header.size); buff[msg.header.size] = '\0'; - if(msg.header.type == CHANNEL_OPEN) - { - channel_open(fd, buff); - } - else - { - channel_close(fd, buff); - } + LOG("Open a new channel: %s (%d)", buff, fd); + channel_open(fd, buff); } break; @@ -429,7 +421,7 @@ static void monitor_hotline(int fd) msg.header.type = CHANNEL_ERROR; (void) snprintf(buff, MAX_CHANNEL_NAME, "Unsupported msg type %d in hotline", (int)msg.header.type); msg.header.size = strlen(buff); - msg.data = buff; + msg.data = (uint8_t*)buff; LOG("%s", buff); if(msg_write(fd, &msg) == -1) { @@ -501,12 +493,23 @@ static void handle_channel(bst_node_t* node, void** args, int argc) antd_tunnel_channel_t* channel = (antd_tunnel_channel_t*) node->data; bst_node_t * client; antd_client_t* rq; - if(channel->sock != -1 && FD_ISSET(channel->sock, fd_in)) + int n; + if(channel != NULL && channel->sock != -1 && FD_ISSET(channel->sock, fd_in)) { + ioctl(channel->sock, FIONREAD, &n); + if(n == 0) + { + // the socket is closed + LOG("Channel %s (%d) is closed by application", channel->name, channel->sock); + destroy_channel(channel); + node->data = NULL; + return; + } + LOG("Got new data on channel %s (%d)", channel->name, channel->sock); // handle msg read if(msg_read(channel->sock, &msg) == -1) { - ERROR("Unable to read message from hotline"); + ERROR("Unable to read message from channel %s (%d)", channel->name, channel->sock); return; } switch (msg.header.type) @@ -515,6 +518,7 @@ static void handle_channel(bst_node_t* node, void** args, int argc) case CHANNEL_ERROR: case CHANNEL_DATA: // forward message to the correct client in the channel + msg.header.channel_id = node->key; client = bst_find(channel->subscribers, msg.header.client_id); if(client != NULL) { @@ -529,6 +533,11 @@ static void handle_channel(bst_node_t* node, void** args, int argc) ERROR("Unable to find client %d to write on channel %s", msg.header.client_id, channel->name); } break; + case CHANNEL_CLOSE: + // close the current channel + channel_close(channel); + node->data = NULL; + break; default: LOG("Message type %d is not supported in client-application communication", msg.header.type); break; @@ -541,7 +550,7 @@ static void set_sock_fd(bst_node_t* node, void** args, int argc) fd_set* fd_in = (fd_set*) args[0]; int* max_fd = args[1]; antd_tunnel_channel_t* channel = (antd_tunnel_channel_t*) node->data; - if(channel->sock != -1) + if(channel != NULL && channel->sock != -1) { FD_SET(channel->sock, fd_in); if(*max_fd < channel->sock) @@ -550,17 +559,15 @@ static void set_sock_fd(bst_node_t* node, void** args, int argc) } } } -static void multiplex(antd_tunnel_t* tunnel_p) +static void* multiplex(void* data_p) { - int max_fdm, status; + int max_fdm; fd_set fd_in; int status = 0; struct timeval timeout; int rc; - size_t i = 0; - chain_t it; - antd_tunnel_channel_t* channel; void *args[2]; + antd_tunnel_t* tunnel_p = (antd_tunnel_t*) data_p; while(status == 0) { timeout.tv_sec = 0; @@ -588,6 +595,7 @@ static void multiplex(antd_tunnel_t* tunnel_p) default: if(FD_ISSET(tunnel_p->hotline, &fd_in)) { + LOG("Got new data on hotline"); monitor_hotline(tunnel_p->hotline); } pthread_mutex_lock(&tunnel_p->lock); @@ -596,6 +604,7 @@ static void multiplex(antd_tunnel_t* tunnel_p) pthread_mutex_unlock(&tunnel_p->lock); } } + return NULL; } void init() @@ -607,20 +616,22 @@ void init() g_tunnel.hotline = -1; g_tunnel.channels = NULL; g_tunnel.id_allocator = 0; + g_tunnel.initialized = 0; - if((g_tunnel.hotline = mk_socket(HOT_LINE_SOCKET, path) == -1)) + if((g_tunnel.hotline = mk_socket(HOT_LINE_SOCKET, path)) == -1) { ERROR("Unable to create hotline socket"); destroy(); return; } - // create the thread - if (pthread_create(&g_tunnel.tid, NULL,(void *(*)(void *))multiplex, (void*)&g_tunnel) != 0) + if (pthread_create(&g_tunnel.tid, NULL,(void* (*)(void *))multiplex, (void*)&g_tunnel) != 0) { ERROR("pthread_create: cannot create tunnel multiplex thread: %s\n", strerror(errno)); destroy(); } + LOG("Tunnel plugin initialised"); + g_tunnel.initialized = 1; } static void free_subscribers(bst_node_t* node, void** args, int argc) @@ -629,24 +640,26 @@ static void free_subscribers(bst_node_t* node, void** args, int argc) (void) args; antd_tunnel_channel_t* channel = (antd_tunnel_channel_t*) node->data; destroy_channel(channel); + node->data = NULL; } void destroy() { char path[BUFFLEN]; - pthread_mutex_lock(&g_tunnel.lock); - if(g_tunnel.tid != -1) - pthread_join(g_tunnel.tid, NULL); - - bst_for_each(g_tunnel.channels, free_subscribers, NULL, 0); - if(g_tunnel.hotline != -1) + if(g_tunnel.initialized) { - (void) close(g_tunnel.hotline); - (void) snprintf(path, BUFFLEN, "%s/%s/%s", __plugin__.tmpdir, SOCK_DIR_NAME, HOT_LINE_SOCKET); - (void) unlink(path); + pthread_mutex_lock(&g_tunnel.lock); + bst_for_each(g_tunnel.channels, free_subscribers, NULL, 0); + if(g_tunnel.hotline != -1) + { + (void) close(g_tunnel.hotline); + (void) snprintf(path, BUFFLEN, "%s/%s/%s", __plugin__.tmpdir, SOCK_DIR_NAME, HOT_LINE_SOCKET); + (void) unlink(path); + } + pthread_mutex_unlock(&g_tunnel.lock); + (void)pthread_join(g_tunnel.tid, NULL); + bst_free(g_tunnel.channels); + pthread_mutex_destroy(&g_tunnel.lock); } - bst_free(g_tunnel.channels); - pthread_mutex_unlock(&g_tunnel.lock); - pthread_mutex_destroy(&g_tunnel.lock); } static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client) { @@ -666,7 +679,7 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client channel = (antd_tunnel_channel_t*)node->data; if(channel) { - if(msg_write(channel->sock, &msg) == -1) + if(msg_write(channel->sock, msg) == -1) { ERROR("Unable to write data to channel [%s] from client %d", channel->name, msg->header.client_id); } @@ -681,7 +694,7 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client msg->header.type = CHANNEL_ERROR; (void) snprintf(buff, BUFFLEN, "Channel name is too long. Max length is %d", MAX_CHANNEL_NAME); msg->header.size = strlen(buff); - msg->data = buff; + msg->data = (uint8_t*)buff; ERROR("%s", buff); write_msg_to_client(msg, client); return; @@ -721,7 +734,7 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client msg->header.type = CHANNEL_ERROR; (void) snprintf(buff, BUFFLEN, "Channel not found"); msg->header.size = strlen(buff); - msg->data = buff; + msg->data = (uint8_t*)buff; ERROR("%s", buff); write_msg_to_client(msg, client); } @@ -732,9 +745,9 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client break; } } -void *handle(void *rqdata) +void *handle(void *rq_data) { - antd_request_t *rq = (antd_request_t *)rqdata; + antd_request_t *rq = (antd_request_t *)rq_data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, time(NULL)); ws_msg_header_t *h = NULL; antd_tunnel_msg_t msg; @@ -744,7 +757,7 @@ void *handle(void *rqdata) int rc, long_value, offset; task->priority++; int cl_fd = ((antd_client_t *)rq->client)->sock; - if(g_tunnel.tid == -1) + if(g_tunnel.initialized == 0) { ERROR("The tunnel plugin is not initialised correctly"); return task; @@ -856,6 +869,7 @@ void *handle(void *rqdata) } free(h); } + } } reschedule_task: task->handle = handle;