From 3121fc86a788c1ee7462c469915a545218fbaaf4 Mon Sep 17 00:00:00 2001 From: DanyLE Date: Wed, 29 Mar 2023 12:08:38 +0200 Subject: [PATCH] feat: support multiple hotlines, use specific keychain channel for user verification --- tunnel.c | 471 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 308 insertions(+), 163 deletions(-) diff --git a/tunnel.c b/tunnel.c index c0183ae..0f8fb1a 100644 --- a/tunnel.c +++ b/tunnel.c @@ -22,22 +22,21 @@ #include #define MAX_CHANNEL_NAME 64 -#define HOT_LINE_SOCKET "antd_hotline.sock" -#define KEY_CHAIN_FIFO "antunnel_keychain" -#define SOCK_DIR_NAME "channels" +#define KEYCHAIN_CHANNEL "keychain" #define COOKIE_NAME "sessionid" #define MAX_CHANNEL_ID 65535u #define KEY_LEN 40 #define USER_LEN 64 -#define MAX_SESSION_TIMEOUT (15u * 60u) //15 min +#define MAX_BACK_LOG 64 +#define MAX_SESSION_TIMEOUT (15u * 60u) // 15 min #define PING_INTERVAL 10u // 10s -#define PROCESS_TIMEOUT 30000u //30 ms +#define PROCESS_TIMEOUT 30000u // 30 ms -#define MAX_CHANNEL_PATH 512 +#define MAX_CHANNEL_PATH 108 -#define MSG_MAGIC_BEGIN (uint16_t)0x414e //AN -#define MSG_MAGIC_END (uint16_t)0x5444 //TD +#define MSG_MAGIC_BEGIN (uint16_t)0x414e // AN +#define MSG_MAGIC_END (uint16_t)0x5444 // TD #define CHANNEL_OK (uint8_t)0x0 #define CHANNEL_ERROR (uint8_t)0x1 @@ -48,7 +47,7 @@ #define CHANNEL_DATA (uint8_t)0x6 #define CHANNEL_CTRL (uint8_t)0x7 #define TUNNEL_PING (uint8_t)0x8 -//#define CHANNEL_LIST (uint8_t)0x7 +// #define CHANNEL_LIST (uint8_t)0x7 typedef struct { int sock; @@ -56,6 +55,13 @@ typedef struct bst_node_t *subscribers; } antd_tunnel_channel_t; +typedef struct +{ + int sock; + char address[MAX_CHANNEL_PATH]; + int port; +} antd_tunnel_hotline_t; + typedef struct { uint8_t type; @@ -72,7 +78,7 @@ typedef struct /** * Message between tunnel and publishers is sent in the following format * |BEGIN MAGIC(2)|MSG TYPE(1)| CHANNEL ID (2)| CLIENT ID (2)| data length (4)| data(m) | END MAGIC(2)| - * + * * Message between tunnel and client is sent in the following minima format * |MSG TYPE(1)| CHANNEL ID (2)| CLIENT ID (2)| data(m) | */ @@ -90,60 +96,21 @@ typedef struct bst_node_t *channels; bst_node_t *keychain; pthread_t tid; - int hotline; - int key_fd; + bst_node_t *hotlines; uint16_t id_allocator; uint8_t initialized; } antd_tunnel_t; static antd_tunnel_t g_tunnel; -static int mk_keychain_fifo(const char *name, char *path) -{ - // create the FIFO - (void)snprintf(path, MAX_CHANNEL_PATH, "%s/%s/%s", __plugin__.tmpdir, SOCK_DIR_NAME, name); - (void)unlink(path); - if (mkfifo(path, 0666) == -1) - { - ERROR("Unable to create keychain FIFO %s: %s", path, strerror(errno)); - return -1; - } - int fifo_fd = open(path, O_RDWR); - if (fifo_fd == -1) - { - ERROR("Unable to open FIFO %s: %s", path, strerror(errno)); - return -1; - } - LOG("Keychain FIFO: %s created", path); - return fifo_fd; -} - -static int mk_socket(const char *name, char *path) +static int mk_un_socket(antd_tunnel_hotline_t *line) { struct sockaddr_un address; address.sun_family = AF_UNIX; + // remove socket file if exists + (void)remove(line->address); // create the socket - (void)snprintf(path, MAX_CHANNEL_PATH, "%s/%s/", __plugin__.tmpdir, SOCK_DIR_NAME); - LOG("Socket path is: %s, name %s", path, name); - if (!_exist(path)) - { - LOG("Socket dir does not exist, create it: %s", path); - if (mkdir(path, 0755) == -1) - { - ERROR("Unable to create socket dir: %s =", strerror(errno)); - return -1; - } - } - - // Append the name of the socket - if (strlen(path) + strlen(name) > MAX_CHANNEL_PATH) - { - ERROR("Socket file path exceeds the maximal size of: %d", MAX_CHANNEL_PATH); - return -1; - } - (void)strcat(path, name); - - (void)strncpy(address.sun_path, path, sizeof(address.sun_path)); + (void)strncpy(address.sun_path, line->address, sizeof(address.sun_path)); int fd = socket(AF_UNIX, SOCK_STREAM, 0); if (fd == -1) { @@ -153,17 +120,67 @@ static int mk_socket(const char *name, char *path) if (bind(fd, (struct sockaddr *)(&address), sizeof(address)) == -1) { ERROR("Unable to bind name: %s to a socket: %s", address.sun_path, strerror(errno)); + close(fd); return -1; } // mark the socket as passive mode - if (listen(fd, 100) == -1) + + if (listen(fd, MAX_BACK_LOG) == -1) { - ERROR("Unable to listen to socket: %d (%s): %s", fd, path, strerror(errno)); + ERROR("Unable to listen to socket: %d (%s): %s", fd, line->address, strerror(errno)); + close(fd); return -1; } - LOG("Socket %s is created successfully: %d", path, fd); + LOG("Socket %s is created successfully: %d", line->address, fd); return fd; } +static int mk_tcp_socket(antd_tunnel_hotline_t *line) +{ + int fd = -1; + struct sockaddr_in name; + fd = socket(PF_INET, SOCK_STREAM, 0); + if (fd == -1) + { + ERROR("Unable to create TCP socket socket: %s", strerror(errno)); + return -1; + } + + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) == -1) + { + ERROR("Unable to set reuse address on port %d - setsockopt: %s", line->port, strerror(errno)); + } + + memset(&name, 0, sizeof(name)); + name.sin_family = AF_INET; + name.sin_port = htons(line->port); + name.sin_addr.s_addr = htonl(INADDR_ANY); + if (bind(fd, (struct sockaddr *)&name, sizeof(name)) < 0) + { + ERROR("Unable to bind TCP socket at port %d -bind: %s", line->port, strerror(errno)); + close(fd); + return -1; + } + + if (listen(fd, MAX_BACK_LOG) < 0) + { + ERROR("Unable to listen on Port %d - listen: %s", line->port, strerror(errno)); + close(fd); + return -1; + } + return fd; +} + +static int mk_socket(antd_tunnel_hotline_t *line) +{ + if (line->port != -1) + { + return mk_tcp_socket(line); + } + else + { + return mk_un_socket(line); + } +} static int msg_check_number(int fd, uint16_t number) { @@ -304,7 +321,7 @@ static int msg_write(int fd, antd_tunnel_msg_t *msg) ERROR("Unable to write msg channel id: %s", strerror(errno)); return -1; } - //write client id + // write client id net16 = htons(msg->header.client_id); if (guard_write(fd, &net16, sizeof(msg->header.client_id)) == -1) { @@ -453,7 +470,7 @@ static void channel_open(int fd, const char *name) return; } // create socket file - (void)strncpy(channel->name, name, MAX_CHANNEL_NAME); + (void)snprintf(channel->name, MAX_CHANNEL_NAME, "%s", name); channel->sock = fd; // response with ok message msg.header.type = CHANNEL_OK; @@ -468,6 +485,17 @@ static void channel_open(int fd, const char *name) pthread_mutex_lock(&g_tunnel.lock); g_tunnel.channels = bst_insert(g_tunnel.channels, hash_val, (void *)channel); pthread_mutex_unlock(&g_tunnel.lock); + if(strncmp(name, KEYCHAIN_CHANNEL, strlen(KEYCHAIN_CHANNEL)) == 0) + { + // subscribe to it as special client + LOG("Subscribe to channel %s as special client", KEYCHAIN_CHANNEL); + msg.header.type = CHANNEL_SUBSCRIBE; + msg.header.size = 0; + if(msg_write(channel->sock,&msg) == -1) + { + ERROR("Unable to subscribe to %s for authentication", KEYCHAIN_CHANNEL); + } + } } static void channel_close(antd_tunnel_channel_t *channel) @@ -493,8 +521,18 @@ static void channel_close(antd_tunnel_channel_t *channel) } } } -static void update_keychain(int listen_fd) +static void update_keychain(antd_tunnel_msg_t *msg) { + if(msg->header.size <= KEY_LEN) + { + ERROR("Expected message data length greater than %d, got %d", KEY_LEN, msg->header.size); + return; + } + if(msg->header.size > KEY_LEN + USER_LEN) + { + ERROR("Expected message data length smaller than %d, got %d", KEY_LEN + USER_LEN, msg->header.size); + return; + } antd_tunnel_key_t *key_p = (antd_tunnel_key_t *)malloc(sizeof(antd_tunnel_key_t)); if (key_p == NULL) { @@ -503,28 +541,12 @@ static void update_keychain(int listen_fd) } (void)memset(key_p->hash, 0, KEY_LEN + 1); (void)memset(key_p->user, 0, USER_LEN + 1); - int size; - if ((size = read(listen_fd, key_p->hash, KEY_LEN)) == -1) - { - ERROR("Unable to read data from keychain FIFO: %s", strerror(errno)); - free(key_p); - return; - } - if (size != KEY_LEN) - { - ERROR("Invalid key size %d", size); - free(key_p); - return; - } - if ((size = read(listen_fd, key_p->user, USER_LEN)) == -1) - { - ERROR("Unable to read user from keychain FIFO: %s", strerror(errno)); - free(key_p); - return; - } + (void)memcpy(key_p->hash, msg->data, KEY_LEN); + (void)memcpy(key_p->user, msg->data + KEY_LEN, msg->header.size - KEY_LEN); + + LOG("User %s key %s", key_p->user, key_p->hash); // looking for key in the keychain int hash_val = simple_hash(key_p->hash); - pthread_mutex_lock(&g_tunnel.lock); bst_node_t *node = bst_find(g_tunnel.keychain, hash_val); if (node == NULL) { @@ -539,7 +561,6 @@ static void update_keychain(int listen_fd) LOG("Update existing key in the keychain for user %s", existing_key->user); free(key_p); } - pthread_mutex_unlock(&g_tunnel.lock); } static void monitor_hotline(int listen_fd) { @@ -589,7 +610,7 @@ static void monitor_hotline(int listen_fd) default: msg.header.type = CHANNEL_ERROR; - (void)snprintf(buff, MAX_CHANNEL_NAME, "Unsupported msg type %d in hotline", (int)msg.header.type); + (void)snprintf(buff, MAX_CHANNEL_NAME, "Unsupported msg type %d in hotline %d", (int)msg.header.type, listen_fd); msg.header.size = strlen(buff); if (msg.data) free(msg.data); @@ -597,11 +618,24 @@ static void monitor_hotline(int listen_fd) LOG("%s", buff); if (msg_write(fd, &msg) == -1) { - ERROR("Unable to write error to hotline"); + ERROR("Unable to write error to hotline %d", listen_fd); } break; } } + +static void handle_hotline(bst_node_t *node, void **args, int argc) +{ + (void)argc; + fd_set *fd_in = (fd_set *)args[0]; + antd_tunnel_hotline_t *line = (antd_tunnel_hotline_t *)node->data; + if (FD_ISSET(line->sock, fd_in)) + { + LOG("Got new data on hotline %d", line->sock); + monitor_hotline(line->sock); + } +} + static void handle_channel(bst_node_t *node, void **args, int argc) { antd_tunnel_msg_t msg; @@ -623,8 +657,8 @@ static void handle_channel(bst_node_t *node, void **args, int argc) node->data = NULL; return; } - //LOG("Got new data on channel %s (%d)", channel->name, channel->sock); - // handle msg read + // LOG("Got new data on channel %s (%d)", channel->name, channel->sock); + // handle msg read if (msg_read(channel->sock, &msg) == -1) { ERROR("Unable to read message from channel %s (%d)", channel->name, channel->sock); @@ -637,38 +671,56 @@ static void handle_channel(bst_node_t *node, void **args, int argc) case CHANNEL_DATA: case CHANNEL_UNSUBSCRIBE: case CHANNEL_CTRL: - // forward message to the correct client in the channel - msg.header.channel_id = node->key; - client = bst_find(channel->subscribers, msg.header.client_id); - if (client != NULL) + // special case, the channel is named keychain, which is used + // internally by tunnel and is not exposed to final user + // this channel has no subscriber + if((strncmp(channel->name, KEYCHAIN_CHANNEL, strlen(KEYCHAIN_CHANNEL)) == 0 )) { - rq = (antd_client_t *)client->data; - if (rq != NULL) + if(msg.header.type == CHANNEL_DATA) { - if (write_msg_to_client(&msg, rq) != 0) - { - ERROR("Unable to send CTRL command to client"); - // remove the client from the list - if (msg.header.type != CHANNEL_UNSUBSCRIBE) - { - // tell the other endpoint to remove the subscriber - msg.header.type = CHANNEL_UNSUBSCRIBE; - msg.header.size = 0; - if (msg_write(channel->sock, &msg) == -1) - { - ERROR("Unable to send unsubscribe notification to channel %s (%d)", channel->name, channel->sock); - } - } - } + // update keychain + update_keychain(&msg); + } + else + { + LOG("Ignore keychain message type: %d", msg.header.type); } } else { - ERROR("Unable to find client %d to write on channel %s", msg.header.client_id, channel->name); - } - if (msg.header.type == CHANNEL_UNSUBSCRIBE) - { - channel->subscribers = bst_delete(channel->subscribers, msg.header.client_id); + // forward message to the correct client in the channel + msg.header.channel_id = node->key; + client = bst_find(channel->subscribers, msg.header.client_id); + if (client != NULL) + { + rq = (antd_client_t *)client->data; + if (rq != NULL) + { + if (write_msg_to_client(&msg, rq) != 0) + { + ERROR("Unable to send CTRL command to client"); + // remove the client from the list + if (msg.header.type != CHANNEL_UNSUBSCRIBE) + { + // tell the other endpoint to remove the subscriber + msg.header.type = CHANNEL_UNSUBSCRIBE; + msg.header.size = 0; + if (msg_write(channel->sock, &msg) == -1) + { + ERROR("Unable to send unsubscribe notification to channel %s (%d)", channel->name, channel->sock); + } + } + } + } + } + else + { + ERROR("Unable to find client %d to write on channel %s", msg.header.client_id, channel->name); + } + if (msg.header.type == CHANNEL_UNSUBSCRIBE) + { + channel->subscribers = bst_delete(channel->subscribers, msg.header.client_id); + } } break; @@ -687,7 +739,7 @@ static void handle_channel(bst_node_t *node, void **args, int argc) free(msg.data); } } -static void set_sock_fd(bst_node_t *node, void **args, int argc) +static void set_channel_sock_fd(bst_node_t *node, void **args, int argc) { (void)argc; fd_set *fd_in = (fd_set *)args[0]; @@ -702,6 +754,21 @@ static void set_sock_fd(bst_node_t *node, void **args, int argc) } } } +static void set_hotline_sock_fd(bst_node_t *node, void **args, int argc) +{ + (void)argc; + fd_set *fd_in = (fd_set *)args[0]; + int *max_fd = args[1]; + antd_tunnel_hotline_t *line = (antd_tunnel_hotline_t *)node->data; + if (line != NULL && line->sock != -1) + { + FD_SET(line->sock, fd_in); + if (*max_fd < line->sock) + { + *max_fd = line->sock; + } + } +} static void *multiplex(void *data_p) { int max_fdm; @@ -715,38 +782,30 @@ static void *multiplex(void *data_p) while (status == 0) { FD_ZERO(&fd_in); - FD_SET(tunnel_p->hotline, &fd_in); - FD_SET(tunnel_p->key_fd, &fd_in); - max_fdm = tunnel_p->hotline > tunnel_p->key_fd ? tunnel_p->hotline : tunnel_p->key_fd; + max_fdm = 0; pthread_mutex_lock(&tunnel_p->lock); args[0] = (void *)&fd_in; args[1] = (void *)&max_fdm; - bst_for_each(tunnel_p->channels, set_sock_fd, args, 2); + bst_for_each(tunnel_p->channels, set_channel_sock_fd, args, 2); + bst_for_each(tunnel_p->hotlines, set_hotline_sock_fd, args, 2); pthread_mutex_unlock(&tunnel_p->lock); rc = select(max_fdm + 1, &fd_in, NULL, NULL, NULL); switch (rc) { case -1: - LOG("Error %d on select()\n", errno); + ERROR("Error on select(): %s", strerror(errno)); status = 1; break; case 0: break; // we have data default: - if (FD_ISSET(tunnel_p->hotline, &fd_in)) - { - // LOG("Got new data on hotline"); - monitor_hotline(tunnel_p->hotline); - } - if (FD_ISSET(tunnel_p->key_fd, &fd_in)) - { - update_keychain(tunnel_p->key_fd); - } - pthread_mutex_lock(&tunnel_p->lock); - closed_channels = list_init(); args[0] = (void *)&fd_in; args[1] = (void *)&closed_channels; + bst_for_each(tunnel_p->hotlines, handle_hotline, args, 1); + + pthread_mutex_lock(&tunnel_p->lock); + closed_channels = list_init(); bst_for_each(tunnel_p->channels, handle_channel, args, 2); list_for_each(item, closed_channels) { @@ -760,30 +819,102 @@ static void *multiplex(void *data_p) return NULL; } +static int init_hotlines() +{ + char *tmp; + regmatch_t regex_matches[3]; + // read plugin configuration + if (!__plugin__.config) + { + PLUGIN_PANIC("No plugin configuration found. Please specify it in server config file"); + return -1; + } + tmp = (char *)dvalue(__plugin__.config, "hotlines"); + // the hotlines can be multiple UNIX or TCP sockets separated by comma + // e.g antd plugin config: hotlines = unix:/tmp/hotline.sock,192.168.1.71:5533 + if (!tmp) + { + PLUGIN_PANIC("No hotlines configuration found"); + return -1; + } + // split the configuration by , + list_t socket_list = split(tmp, ","); + antd_tunnel_hotline_t *line; + item_t item; + list_for_each(item, socket_list) + { + line = NULL; + tmp = (char *)item->value.ptr; + if (strncmp(tmp, "unix:", 5) == 0) + { + if (strlen(tmp + 5) > MAX_CHANNEL_PATH - 1) + { + ERROR("socket configuration is too long: %s", tmp); + continue; + } + line = (antd_tunnel_hotline_t *)malloc(sizeof(antd_tunnel_hotline_t)); + line->port = -1; + snprintf(line->address, MAX_CHANNEL_PATH, "%s", tmp + 5); + LOG("Found Unix domain socket configuration: %s", line->address); + } + else if (regex_match("^([a-zA-Z0-9\\-_\\.]+):([0-9]+)$", tmp, 3, regex_matches)) + { + if (regex_matches[1].rm_eo - regex_matches[1].rm_so > MAX_CHANNEL_PATH - 1) + { + ERROR("socket configuration is too long: %s", tmp); + continue; + } + line = (antd_tunnel_hotline_t *)malloc(sizeof(antd_tunnel_hotline_t)); + memcpy(line->address, tmp + regex_matches[2].rm_so, regex_matches[2].rm_eo - regex_matches[2].rm_so); + line->port = atoi(line->address); + (void *)memset(line->address, 0, MAX_CHANNEL_PATH); + memcpy(line->address, tmp + regex_matches[1].rm_so, regex_matches[1].rm_eo - regex_matches[1].rm_so); + LOG("Found TCP socket configuration: %s:%d", line->address, line->port); + } + else + { + ERROR("Unknown socket configuration: %s", tmp); + continue; + } + // create the socket + if (line) + { + line->sock = mk_socket(line); + if (line->sock == -1) + { + ERROR("Unable to create hotline socket"); + free(line); + line = NULL; + } + else + { + g_tunnel.hotlines = bst_insert(g_tunnel.hotlines, line->sock, (void *)line); + } + } + } + list_free(&socket_list); + // panic if not hotline + if (!g_tunnel.hotlines) + { + PLUGIN_PANIC("Unable to initialize tunnel hotlines"); + return -1; + } + return 0; +} + void init() { - char path[MAX_CHANNEL_PATH]; - // initialise the lock - (void)pthread_mutex_init(&g_tunnel.lock, NULL); // initialise the channel - g_tunnel.hotline = -1; + g_tunnel.hotlines = NULL; g_tunnel.channels = NULL; g_tunnel.id_allocator = 0; g_tunnel.initialized = 0; g_tunnel.keychain = NULL; - g_tunnel.key_fd = -1; - if ((g_tunnel.hotline = mk_socket(HOT_LINE_SOCKET, path)) == -1) - { - ERROR("Unable to create hotline socket"); - destroy(); + if (init_hotlines() != 0) return; - } - if ((g_tunnel.key_fd = mk_keychain_fifo(KEY_CHAIN_FIFO, path)) == -1) - { - ERROR("Unable to create keychain FIFO"); - destroy(); - return; - } + // initialise the lock + (void)pthread_mutex_init(&g_tunnel.lock, NULL); + // create the thread if (pthread_create(&g_tunnel.tid, NULL, (void *(*)(void *))multiplex, (void *)&g_tunnel) != 0) { @@ -802,34 +933,34 @@ static void free_subscribers(bst_node_t *node, void **args, int argc) destroy_channel(channel); node->data = NULL; } +static void free_hotlines(bst_node_t *node, void **args, int argc) +{ + (void)argc; + (void)args; + antd_tunnel_hotline_t *line = (antd_tunnel_hotline_t *)node->data; + (void)close(node->key); + if (line->port == 0) + (void)unlink(line->address); + (void)free(node->data); + node->data = NULL; +} void destroy() { - char path[MAX_CHANNEL_PATH]; if (g_tunnel.initialized) { pthread_mutex_lock(&g_tunnel.lock); bst_for_each(g_tunnel.channels, free_subscribers, NULL, 0); + bst_for_each(g_tunnel.hotlines, free_hotlines, NULL, 0); pthread_mutex_unlock(&g_tunnel.lock); (void)pthread_join(g_tunnel.tid, NULL); bst_free(g_tunnel.channels); bst_free(g_tunnel.keychain); + bst_free(g_tunnel.hotlines); pthread_mutex_destroy(&g_tunnel.lock); LOG("Antd tunnel is destroyed"); } - if (g_tunnel.hotline != -1) - { - (void)close(g_tunnel.hotline); - (void)snprintf(path, MAX_CHANNEL_PATH, "%s/%s/%s", __plugin__.tmpdir, SOCK_DIR_NAME, HOT_LINE_SOCKET); - (void)unlink(path); - } - if (g_tunnel.key_fd != -1) - { - (void)close(g_tunnel.key_fd); - (void)snprintf(path, MAX_CHANNEL_PATH, "%s/%s/%s", __plugin__.tmpdir, SOCK_DIR_NAME, KEY_CHAIN_FIFO); - (void)unlink(path); - } } -static void process_client_message(antd_tunnel_msg_t *msg, antd_client_t *client, antd_tunnel_key_t * key) +static void process_client_message(antd_tunnel_msg_t *msg, antd_client_t *client, antd_tunnel_key_t *key) { char buff[BUFFLEN + 1]; bst_node_t *node; @@ -886,6 +1017,20 @@ static void process_client_message(antd_tunnel_msg_t *msg, antd_client_t *client buff[msg->header.size] = '\0'; hash_val = hash(buff, MAX_CHANNEL_ID); LOG("Requested channel: [%s]: %d", buff, hash_val); + if(strncmp(buff, KEYCHAIN_CHANNEL, strlen(KEYCHAIN_CHANNEL)) == 0) + { + // send error + msg->header.type = CHANNEL_ERROR; + (void)snprintf(buff, BUFFLEN,"Channel %s is reserved for internal use only", KEYCHAIN_CHANNEL); + msg->header.size = strlen(buff); + msg->data = (uint8_t *)buff; + ERROR("%s", buff); + if (write_msg_to_client(msg, client) != 0) + { + ERROR("Unable to send error message to client"); + } + return; + } } else { @@ -914,7 +1059,7 @@ static void process_client_message(antd_tunnel_msg_t *msg, antd_client_t *client } msg->header.client_id = g_tunnel.id_allocator; msg->header.size = strlen(key->user) + 1; - (void)memset(buff,0, BUFFLEN + 1); + (void)memset(buff, 0, BUFFLEN + 1); (void)memcpy(buff, key->user, msg->header.size - 1); msg->header.type = CHANNEL_SUBSCRIBE; } @@ -1102,7 +1247,7 @@ void *handle(void *rq_data) timeout.tv_usec = PROCESS_TIMEOUT; pfd.fd = client->sock; pfd.events = POLLIN; - status = poll(&pfd, 1, PROCESS_TIMEOUT/ 1000); + status = poll(&pfd, 1, PROCESS_TIMEOUT / 1000); switch (status) { case -1: @@ -1118,7 +1263,7 @@ void *handle(void *rq_data) select(0, NULL, NULL, NULL, &timeout); break; default: - if(pfd.revents & (POLLERR | POLLHUP)) + if (pfd.revents & (POLLERR | POLLHUP)) { ERROR("POLLHUP or POLLERR found"); pthread_mutex_lock(&g_tunnel.lock); @@ -1211,7 +1356,7 @@ void *handle(void *rq_data) { LOG("Websocket: Text data is not supported"); pthread_mutex_lock(&g_tunnel.lock); - //ws_close(rq->client, 1011); + // ws_close(rq->client, 1011); bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1); pthread_mutex_unlock(&g_tunnel.lock); free(h); @@ -1250,7 +1395,7 @@ void *handle(void *rq_data) { // close the connection pthread_mutex_lock(&g_tunnel.lock); - //ws_close(rq->client, 1011); + // ws_close(rq->client, 1011); bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1); pthread_mutex_unlock(&g_tunnel.lock); ERROR("Unable to ping client, close the connection: %d", client->sock);