1
0
mirror of https://github.com/lxsang/antd-tunnel-plugin synced 2024-07-01 13:09:46 +02:00

Add keychain manager to tunnel plugin, user need to login to use the tunnel service

This commit is contained in:
lxsang 2021-10-04 22:15:19 +02:00
commit fa38afe0e2
5 changed files with 596 additions and 381 deletions

28
.drone.yml Normal file
View File

@ -0,0 +1,28 @@
---
kind: pipeline
type: exec
name: default
platform:
os: linux
arch: amd64
clone:
disable: true
steps:
- name: clone
commands:
- pwd
- git clone ssh://git@iohub.dev:2222/lxsang/antd-tunnel-plugin.git
- cd ./antd-tunnel-plugin && git checkout master
- name: build
commands:
- cd ./antd-tunnel-plugin
- libtoolize
- aclocal
- autoconf
- automake --add-missing
- ./configure --prefix=/opt/cloud/artifacts/plugins
- make
- make install
trigger:
branch:
- master

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2020 Xuan Sang LE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -1,45 +1,33 @@
# antd-wterm-plugin
**wterm** is an [antd plugin](https://github.com/lxsang/ant-http) providing the Terminal gateway to the web using websocket.
# antd-tunnel-plugin
**tunnel** is an [antd plugin](https://github.com/lxsang/ant-http) providing a generic purpose publish/subscribe message protocol using a single websocket connection.
## Build from source
As **wterm** is an **Antd's** plugin, the server must be pre-installed
As **tunnel** is an **Antd's** plugin, the server must be pre-installed
### build dep
* git
* make
* build-essential
* ant-http (libantd.so)
### build
When all dependencies are installed, the build can be done with a few single command lines:
```bash
mkdir antd
cd antd
mkdir tunnel
cd tunnel
# replace x.x.x by a version number
wget -O- https://get.bitdojo.dev/antd_plugin | bash -s "wterm-x.x.x"
wget -O- https://get.iohub.dev/antd_plugin | bash -s "tunnel-x.x.x"
# or install from a tarball distribution in dist/
tar xvzf wterm-x.x.x.tar.gz
cd wterm-x.x.x
tar xvzf tunnel-x.x.x.tar.gz
cd tunnel-x.x.x
./configure --prefix=/opt/www --enable-debug=yes
make
sudo make install
```
## Run
To run the Antd server with the **wterm** plugin:
```sh
/path/to/your/build/antd
```
Web applications can be put on **/path/to/your/build/htdocs**, the web socket to **wterm** is available at:
```
ws://your_host:your_port/wterm
```
This websocket address can be used with [xterm.js](https://xtermjs.org) to provide web based termnial access
### Generate distribution
```sh
libtoolize

Binary file not shown.

286
tunnel.c
View File

@ -11,10 +11,13 @@
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/ioctl.h>
#include <sys/select.h>
#include <antd/bst.h>
#include <antd/scheduler.h>
#include <antd/ws.h>
#include <time.h>
#include <fcntl.h>
#include <sys/time.h>
#define MAX_CHANNEL_NAME 64
#define HOT_LINE_SOCKET "antd_hotline.sock"
@ -23,8 +26,10 @@
#define COOKIE_NAME "sessionid"
#define MAX_CHANNEL_ID 65535u
#define KEY_LEN 20
//#define MAX_SESSION_TIMEOUT
#define KEY_LEN 40
#define MAX_SESSION_TIMEOUT (15u*60u) //15 min
#define PING_INTERVAL 10u // 10s
#define PROCESS_TIMEOUT 30000u //30 ms
#define MAX_CHANNEL_PATH (sizeof(__plugin__.tmpdir) + strlen(SOCK_DIR_NAME) + strlen(HOT_LINE_SOCKET) + 2)
@ -39,21 +44,25 @@
#define CHANNEL_CLOSE (uint8_t)0x5
#define CHANNEL_DATA (uint8_t)0x6
#define CHANNEL_CTRL (uint8_t)0x7
#define TUNNEL_PING (uint8_t)0x8
//#define CHANNEL_LIST (uint8_t)0x7
typedef struct {
typedef struct
{
int sock;
char name[MAX_CHANNEL_NAME];
bst_node_t *subscribers;
} antd_tunnel_channel_t;
typedef struct {
typedef struct
{
uint8_t type;
uint16_t channel_id;
uint16_t client_id;
uint16_t size;
uint32_t size;
} antd_tunnel_msg_h_t;
typedef struct{
typedef struct
{
antd_tunnel_msg_h_t header;
uint8_t *data;
} antd_tunnel_msg_t;
@ -96,7 +105,7 @@ static int mk_keychain_fifo(const char* name, char * path)
ERROR("Unable to open FIFO %s: %s", path, strerror(errno));
return -1;
}
M_LOG("Keychain FIFO: %s created", path);
LOG("Keychain FIFO: %s created", path);
return fifo_fd;
}
@ -125,7 +134,6 @@ static int mk_socket(const char* name, char* path)
}
(void)strcat(path, name);
(void)strncpy(address.sun_path, path, sizeof(address.sun_path));
int fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd == -1)
@ -151,14 +159,14 @@ static int mk_socket(const char* name, char* path)
static int msg_check_number(int fd, uint16_t number)
{
uint16_t value;
if(read(fd,&value,sizeof(value)) == -1)
if (guard_read(fd, &value, sizeof(value)) == -1)
{
ERROR("Unable to read integer value on socket %d: %s", fd, strerror(errno));
return -1;
}
if (number != value)
{
ERROR("Value mismatches: %0x%02X, expected %0x%02X", value, number);
ERROR("Value mismatches: 0x%02X, expected 0x%02X", value, number);
return -1;
}
return 0;
@ -167,7 +175,7 @@ static int msg_check_number(int fd, uint16_t number)
static int msg_read_string(int fd, char* buffer, uint8_t max_length)
{
uint8_t size;
if(read(fd,&size,sizeof(size)) == -1)
if(guard_read(fd,&size,sizeof(size)) == -1)
{
ERROR("Unable to read string size: %s", strerror(errno));
return -1;
@ -177,7 +185,7 @@ static int msg_read_string(int fd, char* buffer, uint8_t max_length)
ERROR("String length exceeds the maximal value of ", max_length);
return -1;
}
if(read(fd,buffer,size) == -1)
if(guard_read(fd,buffer,size) == -1)
{
ERROR("Unable to read string to buffer: %s", strerror(errno));
return -1;
@ -186,10 +194,10 @@ static int msg_read_string(int fd, char* buffer, uint8_t max_length)
}
*/
static uint8_t* msg_read_payload(int fd, uint16_t* size)
static uint8_t *msg_read_payload(int fd, uint32_t *size)
{
uint8_t *data;
if(read(fd,size,sizeof(*size)) == -1)
if (guard_read(fd, size, sizeof(*size)) == -1)
{
ERROR("Unable to read payload data size: %s", strerror(errno));
return NULL;
@ -205,7 +213,7 @@ static uint8_t* msg_read_payload(int fd, uint16_t* size)
ERROR("Unable to allocate memory for payload data: %s", strerror(errno));
return NULL;
}
if(read(fd,data,*size) == -1)
if (guard_read(fd, data, *size) == -1)
{
ERROR("Unable to read payload data to buffer: %s", strerror(errno));
free(data);
@ -222,22 +230,22 @@ static int msg_read(int fd, antd_tunnel_msg_t* msg)
ERROR("Unable to check begin magic number on socket: %d", fd);
return -1;
}
if(read(fd,&msg->header.type,sizeof(msg->header.type)) == -1)
if (guard_read(fd, &msg->header.type, sizeof(msg->header.type)) == -1)
{
ERROR("Unable to read msg type: %s", strerror(errno));
return -1;
}
if(msg->header.type > 0x6)
if (msg->header.type > 0x8)
{
ERROR("Unknown msg type: %d", msg->header.type);
return -1;
}
if(read(fd, &msg->header.channel_id, sizeof(msg->header.channel_id)) == -1)
if (guard_read(fd, &msg->header.channel_id, sizeof(msg->header.channel_id)) == -1)
{
ERROR("Unable to read msg channel id");
return -1;
}
if(read(fd, &msg->header.client_id, sizeof(msg->header.client_id)) == -1)
if (guard_read(fd, &msg->header.client_id, sizeof(msg->header.client_id)) == -1)
{
ERROR("Unable to read msg client id");
return -1;
@ -263,31 +271,31 @@ static int msg_write(int fd, antd_tunnel_msg_t* msg)
{
// write begin magic number
uint16_t number = MSG_MAGIC_BEGIN;
if(write(fd,&number, sizeof(number)) == -1)
if (guard_write(fd, &number, sizeof(number)) == -1)
{
ERROR("Unable to write begin magic number: %s", strerror(errno));
return -1;
}
// write type
if(write(fd,&msg->header.type, sizeof(msg->header.type)) == -1)
if (guard_write(fd, &msg->header.type, sizeof(msg->header.type)) == -1)
{
ERROR("Unable to write msg type: %s", strerror(errno));
return -1;
}
// write channel id
if(write(fd,&msg->header.channel_id, sizeof(msg->header.channel_id)) == -1)
if (guard_write(fd, &msg->header.channel_id, sizeof(msg->header.channel_id)) == -1)
{
ERROR("Unable to write msg channel id: %s", strerror(errno));
return -1;
}
//write client id
if(write(fd,&msg->header.client_id, sizeof(msg->header.client_id)) == -1)
if (guard_write(fd, &msg->header.client_id, sizeof(msg->header.client_id)) == -1)
{
ERROR("Unable to write msg client id: %s", strerror(errno));
return -1;
}
// write payload len
if(write(fd,&msg->header.size, sizeof(msg->header.size)) == -1)
if (guard_write(fd, &msg->header.size, sizeof(msg->header.size)) == -1)
{
ERROR("Unable to write msg payload length: %s", strerror(errno));
return -1;
@ -295,25 +303,26 @@ static int msg_write(int fd, antd_tunnel_msg_t* msg)
// write payload data
if (msg->header.size > 0)
{
if(write(fd,msg->data, msg->header.size) == -1)
if (guard_write(fd, msg->data, msg->header.size) == -1)
{
ERROR("Unable to write msg payload: %s", strerror(errno));
return -1;
}
}
number = MSG_MAGIC_END;
if(write(fd,&number, sizeof(number)) == -1)
if (guard_write(fd, &number, sizeof(number)) == -1)
{
ERROR("Unable to write end magic number: %s", strerror(errno));
return -1;
}
return 0;
}
static void write_msg_to_client(antd_tunnel_msg_t* msg, antd_client_t* client)
static int write_msg_to_client(antd_tunnel_msg_t *msg, antd_client_t *client)
{
uint8_t *buffer;
uint16_t u16 = 0;
int offset = 0;
int ret;
buffer = (uint8_t *)malloc(msg->header.size +
sizeof((int)MSG_MAGIC_BEGIN) +
sizeof(msg->header.type) +
@ -324,7 +333,7 @@ static void write_msg_to_client(antd_tunnel_msg_t* msg, antd_client_t* client)
if (buffer == NULL)
{
ERROR("unable to allocate memory for write");
return;
return -1;
}
// magic
u16 = MSG_MAGIC_BEGIN;
@ -352,10 +361,10 @@ static void write_msg_to_client(antd_tunnel_msg_t* msg, antd_client_t* client)
offset += sizeof(u16);
// write it to the websocket
ws_b(client,buffer, offset);
ret = ws_b(client, buffer, offset);
free(buffer);
return ret;
}
static void unsubscribe(bst_node_t *node, void **argv, int argc)
{
@ -370,7 +379,10 @@ static void unsubscribe(bst_node_t* node, void** argv, int argc)
msg.header.type = CHANNEL_UNSUBSCRIBE;
msg.header.size = 0;
msg.data = NULL;
write_msg_to_client(&msg,(antd_client_t*)node->data);
if (write_msg_to_client(&msg, (antd_client_t *)node->data) != 0)
{
ERROR("Unable to send unsubscribe message to client");
}
}
}
static void destroy_channel(antd_tunnel_channel_t *channel)
@ -480,31 +492,39 @@ static void channel_close(antd_tunnel_channel_t* channel)
static void update_keychain(int listen_fd)
{
antd_tunnel_key_t* key_p = (antd_tunnel_key_t*) malloc(sizeof(antd_tunnel_key_t));
if(key == NULL)
if(key_p == NULL)
{
ERROR("Unable to allocate memory for key");
return;
}
(void)memset(key_p->hash, 0, KEY_LEN + 1);
if (read(listen_fd, key_p->hash, KEY_LEN) == -1)
int size;
if ((size = read(listen_fd, key_p->hash, KEY_LEN)) == -1)
{
ERROR("Unable to read data from keychain FIFO: %s", strerror(errno));
free(key_p);
return;
}
if(size != KEY_LEN)
{
ERROR("Invalid key size %d", size);
free(key_p);
return;
}
// looking for key in the keychain
int hash_val = simple_hash(key_p->hash);
pthread_mutex_lock(&g_tunnel.lock);
antd_tunnel_key_t* node = (antd_tunnel_key_t*)bst_find(g_tunnel.keychain, hash_val);
bst_node_t* node = bst_find(g_tunnel.keychain, hash_val);
if(node == NULL)
{
key_p->last_update = time(NULL);
bst_insert(g_tunnel.keychain, hash_val, (void*) key_p);
LOG("New key add to the keychain: %s", key_p->hash);
g_tunnel.keychain = bst_insert(g_tunnel.keychain, hash_val, (void*) key_p);
LOG("New key added to the keychain (%d)", hash_val);
}
else
{
node->last_update = time(NULL);
antd_tunnel_key_t* existing_key = (antd_tunnel_key_t*)node->data;
existing_key->last_update = time(NULL);
LOG("Update existing key in the keychain");
free(key_p);
}
@ -614,7 +634,21 @@ static void handle_channel(bst_node_t* node, void** args, int argc)
rq = (antd_client_t *)client->data;
if (rq != NULL)
{
write_msg_to_client(&msg, rq);
if (write_msg_to_client(&msg, rq) != 0)
{
ERROR("Unable to send CTRL command to client");
// remove the client from the list
if (msg.header.type != CHANNEL_UNSUBSCRIBE)
{
// tell the other endpoint to remove the subscriber
msg.header.type = CHANNEL_UNSUBSCRIBE;
msg.header.size = 0;
if (msg_write(channel->sock, &msg) == -1)
{
ERROR("Unable to send unsubscribe notification to channel %s (%d)", channel->name, channel->sock);
}
}
}
}
}
else
@ -767,7 +801,9 @@ void destroy()
pthread_mutex_unlock(&g_tunnel.lock);
(void)pthread_join(g_tunnel.tid, NULL);
bst_free(g_tunnel.channels);
bst_free(g_tunnel.keychain);
pthread_mutex_destroy(&g_tunnel.lock);
LOG("Antd tunnel is destroyed");
}
if(g_tunnel.hotline != -1)
{
@ -804,6 +840,14 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
if (msg_write(channel->sock, msg) == -1)
{
ERROR("Unable to write data to channel [%s] from client %d", channel->name, msg->header.client_id);
// notify client to unsubscribe
msg->header.type = CHANNEL_UNSUBSCRIBE;
msg->header.size = 0;
if (write_msg_to_client(msg, client) != 0)
{
ERROR("Unable to send unsubscribe message to client to client");
}
return;
}
}
}
@ -818,7 +862,10 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
msg->header.size = strlen(buff);
msg->data = (uint8_t *)buff;
ERROR("%s", buff);
write_msg_to_client(msg, client);
if (write_msg_to_client(msg, client) != 0)
{
ERROR("Unable to send error message to client");
}
return;
}
if (msg->header.size > 0)
@ -848,7 +895,10 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
msg->header.size = sizeof(g_tunnel.id_allocator);
(void)memcpy(buff, &g_tunnel.id_allocator, sizeof(g_tunnel.id_allocator));
msg->data = (uint8_t *)buff;
write_msg_to_client(msg, client);
if (write_msg_to_client(msg, client) != 0)
{
ERROR("Unable to send subscribe OK message to client");
}
msg->header.client_id = g_tunnel.id_allocator;
msg->header.type = CHANNEL_SUBSCRIBE;
}
@ -858,7 +908,10 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
msg->header.type = CHANNEL_OK;
msg->header.channel_id = hash_val;
msg->header.size = 0;
write_msg_to_client(msg, client);
if (write_msg_to_client(msg, client) != 0)
{
ERROR("Unable to send unsubscribe OK message to client");
}
msg->header.type = CHANNEL_UNSUBSCRIBE;
}
// forward to publisher
@ -878,7 +931,10 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
if (msg->header.type == CHANNEL_SUBSCRIBE)
{
msg->header.type = CHANNEL_ERROR;
write_msg_to_client(msg, client);
if (write_msg_to_client(msg, client) != 0)
{
ERROR("Unable to send channel not found error to client");
}
}
}
break;
@ -938,12 +994,28 @@ static void unsubscribe_notify(bst_node_t* node, void** argv, int argc)
}
}
list_free(&list);
}
static void keychain_validating(bst_node_t *node, void **argv, int argc)
{
(void)argc;
list_t *list = (list_t *)argv[0];
antd_tunnel_key_t* key_p = NULL;
if(node == NULL || node->data == NULL)
{
return;
}
key_p = (antd_tunnel_key_t*) node->data;
if(difftime(time(NULL), key_p->last_update) > (double)MAX_SESSION_TIMEOUT)
{
list_put_i(list,node->key);
}
}
void *handle(void *rq_data)
{
antd_request_t *rq = (antd_request_t *)rq_data;
antd_client_t *client = ((antd_client_t *)(rq->client));
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, time(NULL));
ws_msg_header_t *h = NULL;
antd_tunnel_msg_t msg;
@ -953,34 +1025,84 @@ void *handle(void *rq_data)
fd_set fd_in;
int offset;
uint16_t u16;
task->priority++;
bst_node_t * node = NULL;
antd_tunnel_key_t* key_p = NULL;
const char* ssid = NULL;
dictionary_t cookie = NULL;
void *argv[1];
if (g_tunnel.initialized == 0)
{
ERROR("The tunnel plugin is not initialised correctly");
return task;
}
// update the keychain
list_t list = list_init();
argv[0] = (void*)&list;
pthread_mutex_lock(&g_tunnel.lock);
bst_for_each(g_tunnel.keychain, keychain_validating, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
item_t item;
list_for_each(item, list)
{
pthread_mutex_lock(&g_tunnel.lock);
g_tunnel.keychain = bst_delete(g_tunnel.keychain, item->value.i);
LOG("Delete invalid key (timeout) with hash %d", item->value.i);
pthread_mutex_unlock(&g_tunnel.lock);
}
list_free(&list);
if (ws_enable(rq->request))
{
argv[0] = (void *)rq->client;
// verify if user is authorized
cookie = dvalue(rq->request, "COOKIE");
if(cookie != NULL)
{
ssid = (const char*)dvalue(cookie, COOKIE_NAME);
}
if(ssid == NULL)
{
return task;
}
pthread_mutex_lock(&g_tunnel.lock);
node = bst_find(g_tunnel.keychain, simple_hash(ssid));
pthread_mutex_unlock(&g_tunnel.lock);
if(node == NULL || node->data == NULL || strcmp(((antd_tunnel_key_t*)node->data)->hash,ssid) != 0)
{
ERROR("User unauthorized, quit");
pthread_mutex_lock(&g_tunnel.lock);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
return task;
}
key_p = (antd_tunnel_key_t*) node->data;
pthread_mutex_lock(&g_tunnel.lock);
key_p->last_update = time(NULL);
pthread_mutex_unlock(&g_tunnel.lock);
// session is valid, continue
timeout.tv_sec = 0;
timeout.tv_usec = 500; // 5 ms
timeout.tv_usec = PROCESS_TIMEOUT;
FD_ZERO(&fd_in);
FD_SET(((antd_client_t*)(rq->client))->sock, &fd_in);
status = select(((antd_client_t*)(rq->client))->sock + 1, &fd_in, NULL, NULL, &timeout);
FD_SET(client->sock, &fd_in);
status = select(client->sock + 1, &fd_in, NULL, NULL, &timeout);
switch (status)
{
case -1:
LOG("Error %d on select()\n", errno);
pthread_mutex_lock(&g_tunnel.lock);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
return task;
break;
case 0:
timeout.tv_sec = 0;
timeout.tv_usec = 500; // 5 ms
timeout.tv_usec = PROCESS_TIMEOUT;
select(0, NULL, NULL, NULL, &timeout);
break;
default:
argv[0] = (void*) rq->client;
pthread_mutex_lock(&g_tunnel.lock);
h = ws_read_header(rq->client);
pthread_mutex_unlock(&g_tunnel.lock);
@ -1001,7 +1123,6 @@ void *handle(void *rq_data)
{
LOG("Websocket: connection closed");
pthread_mutex_lock(&g_tunnel.lock);
//ws_close(rq->client, 1011);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
free(h);
@ -1011,6 +1132,7 @@ void *handle(void *rq_data)
{
// we have data, now read the message,
// the message must be in bin
int ws_msg_len = h->plen;
buffer = (uint8_t *)malloc(h->plen + 1);
if (buffer)
{
@ -1028,6 +1150,9 @@ void *handle(void *rq_data)
ERROR("Invalid begin magic number: %d, expected %d", u16, MSG_MAGIC_BEGIN);
free(buffer);
free(h);
pthread_mutex_lock(&g_tunnel.lock);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
return task;
}
// msgtype
@ -1045,10 +1170,14 @@ void *handle(void *rq_data)
// data size
(void)memcpy(&msg.header.size, buffer + offset, sizeof(msg.header.size));
offset += sizeof(msg.header.size);
// data
msg.data = buffer + offset;
offset += msg.header.size;
if(offset > (int)ws_msg_len)
{
ERROR("Invalid message len: %d", msg.header.size);
return task;
}
// verify end magic
(void)memcpy(&u16, buffer + offset, sizeof(u16));
@ -1058,6 +1187,9 @@ void *handle(void *rq_data)
ERROR("Invalid end magic number: %d, expected %d", u16, MSG_MAGIC_END);
free(buffer);
free(h);
pthread_mutex_lock(&g_tunnel.lock);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
return task;
}
@ -1069,6 +1201,16 @@ void *handle(void *rq_data)
free(buffer);
}
}
else if(h->opcode == WS_PONG)
{
buffer = (uint8_t *)malloc(h->plen + 1);
if (buffer)
{
ws_read_data(rq->client, h, h->plen, buffer);
LOG("Receive pong message from client: %s. Client Alive", buffer);
free(buffer);
}
}
else
{
LOG("Websocket: Text data is not supported");
@ -1081,6 +1223,43 @@ void *handle(void *rq_data)
}
free(h);
}
else
{
timeout.tv_sec = 0;
timeout.tv_usec = PROCESS_TIMEOUT;
select(0, NULL, NULL, NULL, &timeout);
}
}
// check whether we need to send ping message to client
if (difftime(time(NULL), client->last_io) > (double)PING_INTERVAL)
{
/*
msg.header.type = TUNNEL_PING;
msg.header.client_id = 0;
msg.header.channel_id = 0;
msg.header.size = 0;
msg.data = NULL;
if (write_msg_to_client(&msg, client) != 0)
{
// close the connection
pthread_mutex_lock(&g_tunnel.lock);
//ws_close(rq->client, 1011);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
ERROR("Unable to ping client, close the connection: %d", client->sock);
return task;
}*/
if(ws_ping(client,"ANTD-TUNNEL",0) != 0)
{
// close the connection
pthread_mutex_lock(&g_tunnel.lock);
//ws_close(rq->client, 1011);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
ERROR("Unable to ping client, close the connection: %d", client->sock);
return task;
}
}
}
else
@ -1088,8 +1267,7 @@ void *handle(void *rq_data)
return task;
}
task->handle = handle;
task->type = HEAVY;
task->access_time = time(NULL);
select(0, NULL, NULL, NULL, &timeout);
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE);
return task;
}