1
0
mirror of https://github.com/lxsang/antd-tunnel-plugin synced 2024-11-16 01:38:22 +01:00

allow ping message on tunnel

This commit is contained in:
lxsang 2020-12-22 16:00:53 +01:00
parent 61e7040426
commit 15e086441d
2 changed files with 436 additions and 382 deletions

Binary file not shown.

View File

@ -20,6 +20,8 @@
#define HOT_LINE_SOCKET "antd_hotline.sock" #define HOT_LINE_SOCKET "antd_hotline.sock"
#define SOCK_DIR_NAME "channels" #define SOCK_DIR_NAME "channels"
#define PING_INTERVAL 10u // 10s
#define MAX_CHANNEL_ID 65535u #define MAX_CHANNEL_ID 65535u
#define MAX_CHANNEL_PATH (sizeof(__plugin__.tmpdir) + strlen(SOCK_DIR_NAME) + strlen(HOT_LINE_SOCKET) + 2) #define MAX_CHANNEL_PATH (sizeof(__plugin__.tmpdir) + strlen(SOCK_DIR_NAME) + strlen(HOT_LINE_SOCKET) + 2)
@ -35,21 +37,25 @@
#define CHANNEL_CLOSE (uint8_t)0x5 #define CHANNEL_CLOSE (uint8_t)0x5
#define CHANNEL_DATA (uint8_t)0x6 #define CHANNEL_DATA (uint8_t)0x6
#define CHANNEL_CTRL (uint8_t)0x7 #define CHANNEL_CTRL (uint8_t)0x7
#define TUNNEL_PING (uint8_t)0x8
//#define CHANNEL_LIST (uint8_t)0x7 //#define CHANNEL_LIST (uint8_t)0x7
typedef struct { typedef struct
{
int sock; int sock;
char name[MAX_CHANNEL_NAME]; char name[MAX_CHANNEL_NAME];
bst_node_t *subscribers; bst_node_t *subscribers;
} antd_tunnel_channel_t; } antd_tunnel_channel_t;
typedef struct { typedef struct
{
uint8_t type; uint8_t type;
uint16_t channel_id; uint16_t channel_id;
uint16_t client_id; uint16_t client_id;
uint16_t size; uint16_t size;
} antd_tunnel_msg_h_t; } antd_tunnel_msg_h_t;
typedef struct{ typedef struct
{
antd_tunnel_msg_h_t header; antd_tunnel_msg_h_t header;
uint8_t *data; uint8_t *data;
} antd_tunnel_msg_t; } antd_tunnel_msg_t;
@ -58,7 +64,8 @@ typedef struct{
* |BEGIN MAGIC(2)|MSG TYPE(1)| CHANNEL ID (2)| CLIENT ID (2)| data length (2)| data(m) | END MAGIC(2)| * |BEGIN MAGIC(2)|MSG TYPE(1)| CHANNEL ID (2)| CLIENT ID (2)| data length (2)| data(m) | END MAGIC(2)|
*/ */
typedef struct { typedef struct
{
pthread_mutex_t lock; pthread_mutex_t lock;
bst_node_t *channels; bst_node_t *channels;
pthread_t tid; pthread_t tid;
@ -69,7 +76,6 @@ typedef struct {
static antd_tunnel_t g_tunnel; static antd_tunnel_t g_tunnel;
static int mk_socket(const char *name, char *path) static int mk_socket(const char *name, char *path)
{ {
struct sockaddr_un address; struct sockaddr_un address;
@ -95,7 +101,6 @@ static int mk_socket(const char* name, char* path)
} }
(void)strcat(path, name); (void)strcat(path, name);
(void)strncpy(address.sun_path, path, sizeof(address.sun_path)); (void)strncpy(address.sun_path, path, sizeof(address.sun_path));
int fd = socket(AF_UNIX, SOCK_STREAM, 0); int fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd == -1) if (fd == -1)
@ -197,7 +202,7 @@ static int msg_read(int fd, antd_tunnel_msg_t* msg)
ERROR("Unable to read msg type: %s", strerror(errno)); ERROR("Unable to read msg type: %s", strerror(errno));
return -1; return -1;
} }
if(msg->header.type > 0x6) if (msg->header.type > 0x8)
{ {
ERROR("Unknown msg type: %d", msg->header.type); ERROR("Unknown msg type: %d", msg->header.type);
return -1; return -1;
@ -279,11 +284,12 @@ static int msg_write(int fd, antd_tunnel_msg_t* msg)
} }
return 0; return 0;
} }
static void write_msg_to_client(antd_tunnel_msg_t* msg, antd_client_t* client) static int write_msg_to_client(antd_tunnel_msg_t *msg, antd_client_t *client)
{ {
uint8_t *buffer; uint8_t *buffer;
uint16_t u16 = 0; uint16_t u16 = 0;
int offset = 0; int offset = 0;
int ret;
buffer = (uint8_t *)malloc(msg->header.size + buffer = (uint8_t *)malloc(msg->header.size +
sizeof((int)MSG_MAGIC_BEGIN) + sizeof((int)MSG_MAGIC_BEGIN) +
sizeof(msg->header.type) + sizeof(msg->header.type) +
@ -294,7 +300,7 @@ static void write_msg_to_client(antd_tunnel_msg_t* msg, antd_client_t* client)
if (buffer == NULL) if (buffer == NULL)
{ {
ERROR("unable to allocate memory for write"); ERROR("unable to allocate memory for write");
return; return -1;
} }
// magic // magic
u16 = MSG_MAGIC_BEGIN; u16 = MSG_MAGIC_BEGIN;
@ -322,10 +328,10 @@ static void write_msg_to_client(antd_tunnel_msg_t* msg, antd_client_t* client)
offset += sizeof(u16); offset += sizeof(u16);
// write it to the websocket // write it to the websocket
ws_b(client,buffer, offset); ret = ws_b(client, buffer, offset);
free(buffer); free(buffer);
return ret;
} }
static void unsubscribe(bst_node_t *node, void **argv, int argc) static void unsubscribe(bst_node_t *node, void **argv, int argc)
{ {
@ -340,7 +346,10 @@ static void unsubscribe(bst_node_t* node, void** argv, int argc)
msg.header.type = CHANNEL_UNSUBSCRIBE; msg.header.type = CHANNEL_UNSUBSCRIBE;
msg.header.size = 0; msg.header.size = 0;
msg.data = NULL; msg.data = NULL;
write_msg_to_client(&msg,(antd_client_t*)node->data); if (write_msg_to_client(&msg, (antd_client_t *)node->data) != 0)
{
ERROR("Unable to send unsubscribe message to client");
}
} }
} }
static void destroy_channel(antd_tunnel_channel_t *channel) static void destroy_channel(antd_tunnel_channel_t *channel)
@ -551,7 +560,10 @@ static void handle_channel(bst_node_t* node, void** args, int argc)
rq = (antd_client_t *)client->data; rq = (antd_client_t *)client->data;
if (rq != NULL) if (rq != NULL)
{ {
write_msg_to_client(&msg, rq); if (write_msg_to_client(&msg, rq) != 0)
{
ERROR("Unable to send CTRL command to client");
}
} }
} }
else else
@ -737,7 +749,10 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
msg->header.size = strlen(buff); msg->header.size = strlen(buff);
msg->data = (uint8_t *)buff; msg->data = (uint8_t *)buff;
ERROR("%s", buff); ERROR("%s", buff);
write_msg_to_client(msg, client); if (write_msg_to_client(msg, client) != 0)
{
ERROR("Unable to send error message to client");
}
return; return;
} }
if (msg->header.size > 0) if (msg->header.size > 0)
@ -767,7 +782,10 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
msg->header.size = sizeof(g_tunnel.id_allocator); msg->header.size = sizeof(g_tunnel.id_allocator);
(void)memcpy(buff, &g_tunnel.id_allocator, sizeof(g_tunnel.id_allocator)); (void)memcpy(buff, &g_tunnel.id_allocator, sizeof(g_tunnel.id_allocator));
msg->data = (uint8_t *)buff; msg->data = (uint8_t *)buff;
write_msg_to_client(msg, client); if (write_msg_to_client(msg, client) != 0)
{
ERROR("Unable to send subscribe OK message to client");
}
msg->header.client_id = g_tunnel.id_allocator; msg->header.client_id = g_tunnel.id_allocator;
msg->header.type = CHANNEL_SUBSCRIBE; msg->header.type = CHANNEL_SUBSCRIBE;
} }
@ -777,7 +795,10 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
msg->header.type = CHANNEL_OK; msg->header.type = CHANNEL_OK;
msg->header.channel_id = hash_val; msg->header.channel_id = hash_val;
msg->header.size = 0; msg->header.size = 0;
write_msg_to_client(msg, client); if (write_msg_to_client(msg, client) != 0)
{
ERROR("Unable to send unsubscribe OK message to client");
}
msg->header.type = CHANNEL_UNSUBSCRIBE; msg->header.type = CHANNEL_UNSUBSCRIBE;
} }
// forward to publisher // forward to publisher
@ -797,7 +818,10 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
if (msg->header.type == CHANNEL_SUBSCRIBE) if (msg->header.type == CHANNEL_SUBSCRIBE)
{ {
msg->header.type = CHANNEL_ERROR; msg->header.type = CHANNEL_ERROR;
write_msg_to_client(msg, client); if (write_msg_to_client(msg, client) != 0)
{
ERROR("Unable to send channel not found error to client");
}
} }
} }
break; break;
@ -857,12 +881,12 @@ static void unsubscribe_notify(bst_node_t* node, void** argv, int argc)
} }
} }
list_free(&list); list_free(&list);
} }
void *handle(void *rq_data) void *handle(void *rq_data)
{ {
antd_request_t *rq = (antd_request_t *)rq_data; antd_request_t *rq = (antd_request_t *)rq_data;
antd_client_t *client = ((antd_client_t *)(rq->client));
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, time(NULL)); antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, time(NULL));
ws_msg_header_t *h = NULL; ws_msg_header_t *h = NULL;
antd_tunnel_msg_t msg; antd_tunnel_msg_t msg;
@ -882,15 +906,20 @@ void *handle(void *rq_data)
} }
if (ws_enable(rq->request)) if (ws_enable(rq->request))
{ {
argv[0] = (void *)rq->client;
timeout.tv_sec = 0; timeout.tv_sec = 0;
timeout.tv_usec = 500; // 5 ms timeout.tv_usec = 500; // 5 ms
FD_ZERO(&fd_in); FD_ZERO(&fd_in);
FD_SET(((antd_client_t*)(rq->client))->sock, &fd_in); FD_SET(client->sock, &fd_in);
status = select(((antd_client_t*)(rq->client))->sock + 1, &fd_in, NULL, NULL, &timeout);
status = select(client->sock + 1, &fd_in, NULL, NULL, &timeout);
switch (status) switch (status)
{ {
case -1: case -1:
LOG("Error %d on select()\n", errno); LOG("Error %d on select()\n", errno);
pthread_mutex_lock(&g_tunnel.lock);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
return task; return task;
break; break;
case 0: case 0:
@ -899,7 +928,6 @@ void *handle(void *rq_data)
select(0, NULL, NULL, NULL, &timeout); select(0, NULL, NULL, NULL, &timeout);
break; break;
default: default:
argv[0] = (void*) rq->client;
pthread_mutex_lock(&g_tunnel.lock); pthread_mutex_lock(&g_tunnel.lock);
h = ws_read_header(rq->client); h = ws_read_header(rq->client);
pthread_mutex_unlock(&g_tunnel.lock); pthread_mutex_unlock(&g_tunnel.lock);
@ -920,7 +948,6 @@ void *handle(void *rq_data)
{ {
LOG("Websocket: connection closed"); LOG("Websocket: connection closed");
pthread_mutex_lock(&g_tunnel.lock); pthread_mutex_lock(&g_tunnel.lock);
//ws_close(rq->client, 1011);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1); bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock); pthread_mutex_unlock(&g_tunnel.lock);
free(h); free(h);
@ -947,6 +974,9 @@ void *handle(void *rq_data)
ERROR("Invalid begin magic number: %d, expected %d", u16, MSG_MAGIC_BEGIN); ERROR("Invalid begin magic number: %d, expected %d", u16, MSG_MAGIC_BEGIN);
free(buffer); free(buffer);
free(h); free(h);
pthread_mutex_lock(&g_tunnel.lock);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
return task; return task;
} }
// msgtype // msgtype
@ -977,6 +1007,9 @@ void *handle(void *rq_data)
ERROR("Invalid end magic number: %d, expected %d", u16, MSG_MAGIC_END); ERROR("Invalid end magic number: %d, expected %d", u16, MSG_MAGIC_END);
free(buffer); free(buffer);
free(h); free(h);
pthread_mutex_lock(&g_tunnel.lock);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
return task; return task;
} }
@ -1001,6 +1034,27 @@ void *handle(void *rq_data)
free(h); free(h);
} }
} }
// check whether we need to send ping message to client
if (difftime(time(NULL), client->last_io) > (double)PING_INTERVAL)
{
// send message to client
msg.header.type = TUNNEL_PING;
msg.header.client_id = 0;
msg.header.channel_id = 0;
msg.header.size = 0;
msg.data = NULL;
if (write_msg_to_client(&msg, client) != 0)
{
// close the connection
pthread_mutex_lock(&g_tunnel.lock);
//ws_close(rq->client, 1011);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
ERROR("Unable to ping client, close the connection: %d", client->sock);
return task;
}
}
} }
else else
{ {