1
0
mirror of https://github.com/lxsang/antd-tunnel-plugin synced 2024-12-27 10:18:20 +01:00

use minima frame format on client side

This commit is contained in:
Dany LE 2021-10-27 13:49:59 +02:00
parent 14a06ab344
commit 9406c4d33e

View File

@ -67,8 +67,11 @@ typedef struct
uint8_t *data; uint8_t *data;
} antd_tunnel_msg_t; } antd_tunnel_msg_t;
/** /**
* Message is sent in the following format * Message between tunnel and publishers is sent in the following format
* |BEGIN MAGIC(2)|MSG TYPE(1)| CHANNEL ID (2)| CLIENT ID (2)| data length (2)| data(m) | END MAGIC(2)| * |BEGIN MAGIC(2)|MSG TYPE(1)| CHANNEL ID (2)| CLIENT ID (2)| data length (4)| data(m) | END MAGIC(2)|
*
* Message between tunnel and client is sent in the following minima format
* |MSG TYPE(1)| CHANNEL ID (2)| CLIENT ID (2)| data(m) |
*/ */
typedef struct { typedef struct {
@ -320,27 +323,19 @@ static int msg_write(int fd, antd_tunnel_msg_t *msg)
static int write_msg_to_client(antd_tunnel_msg_t *msg, antd_client_t *client) static int write_msg_to_client(antd_tunnel_msg_t *msg, antd_client_t *client)
{ {
uint8_t *buffer; uint8_t *buffer;
uint16_t u16 = 0;
int offset = 0; int offset = 0;
int ret; int ret;
buffer = (uint8_t *)malloc(msg->header.size + buffer = (uint8_t *)malloc(msg->header.size +
sizeof((int)MSG_MAGIC_BEGIN) +
sizeof(msg->header.type) + sizeof(msg->header.type) +
sizeof(msg->header.channel_id) + sizeof(msg->header.channel_id) +
sizeof(msg->header.client_id) + sizeof(msg->header.client_id));
sizeof(msg->header.size) +
sizeof((int)MSG_MAGIC_END));
if (buffer == NULL) if (buffer == NULL)
{ {
ERROR("unable to allocate memory for write"); ERROR("unable to allocate memory for write");
return -1; return -1;
} }
// magic
u16 = MSG_MAGIC_BEGIN;
(void)memcpy(buffer, &u16, sizeof(u16));
offset += sizeof(u16);
// type // type
(void)memcpy(buffer + offset, &msg->header.type, sizeof(msg->header.type)); (void)memcpy(buffer, &msg->header.type, sizeof(msg->header.type));
offset += sizeof(msg->header.type); offset += sizeof(msg->header.type);
// channel id // channel id
(void)memcpy(buffer + offset, &msg->header.channel_id, sizeof(msg->header.channel_id)); (void)memcpy(buffer + offset, &msg->header.channel_id, sizeof(msg->header.channel_id));
@ -348,18 +343,9 @@ static int write_msg_to_client(antd_tunnel_msg_t *msg, antd_client_t *client)
// client id // client id
(void)memcpy(buffer + offset, &msg->header.client_id, sizeof(msg->header.client_id)); (void)memcpy(buffer + offset, &msg->header.client_id, sizeof(msg->header.client_id));
offset += sizeof(msg->header.client_id); offset += sizeof(msg->header.client_id);
// payload length
(void)memcpy(buffer + offset, &msg->header.size, sizeof(msg->header.size));
offset += sizeof(msg->header.size);
// payload // payload
(void)memcpy(buffer + offset, msg->data, msg->header.size); (void)memcpy(buffer + offset, msg->data, msg->header.size);
offset += msg->header.size; offset += msg->header.size;
// magic end
u16 = MSG_MAGIC_END;
(void)memcpy(buffer + offset, &u16, sizeof(u16));
offset += sizeof(u16);
// write it to the websocket // write it to the websocket
ret = ws_b(client, buffer, offset); ret = ws_b(client, buffer, offset);
@ -1024,7 +1010,6 @@ void *handle(void *rq_data)
int status; int status;
fd_set fd_in; fd_set fd_in;
int offset; int offset;
uint16_t u16;
bst_node_t * node = NULL; bst_node_t * node = NULL;
antd_tunnel_key_t* key_p = NULL; antd_tunnel_key_t* key_p = NULL;
const char* ssid = NULL; const char* ssid = NULL;
@ -1142,21 +1127,8 @@ void *handle(void *rq_data)
if (h->plen == 0) if (h->plen == 0)
{ {
offset = 0; offset = 0;
// verify begin magic // msg type
(void)memcpy(&u16, buffer, sizeof(u16)); (void)memcpy(&msg.header.type, buffer, sizeof(msg.header.type));
offset += sizeof(u16);
if (u16 != MSG_MAGIC_BEGIN)
{
ERROR("Invalid begin magic number: %d, expected %d", u16, MSG_MAGIC_BEGIN);
free(buffer);
free(h);
pthread_mutex_lock(&g_tunnel.lock);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
return task;
}
// msgtype
(void)memcpy(&msg.header.type, buffer + offset, sizeof(msg.header.type));
offset += sizeof(msg.header.type); offset += sizeof(msg.header.type);
// channel id // channel id
@ -1167,32 +1139,15 @@ void *handle(void *rq_data)
(void)memcpy(&msg.header.client_id, buffer + offset, sizeof(msg.header.client_id)); (void)memcpy(&msg.header.client_id, buffer + offset, sizeof(msg.header.client_id));
offset += sizeof(msg.header.client_id); offset += sizeof(msg.header.client_id);
// data size
(void)memcpy(&msg.header.size, buffer + offset, sizeof(msg.header.size));
offset += sizeof(msg.header.size);
// data
msg.data = buffer + offset;
offset += msg.header.size;
if(offset > (int)ws_msg_len) if(offset > (int)ws_msg_len)
{ {
ERROR("Invalid message len: %d", msg.header.size); ERROR("Invalid message format");
return task; return task;
} }
// data size
// verify end magic msg.header.size = ws_msg_len - offset;
(void)memcpy(&u16, buffer + offset, sizeof(u16)); // data
offset += sizeof(u16); msg.data = buffer + offset;
if (u16 != MSG_MAGIC_END)
{
ERROR("Invalid end magic number: %d, expected %d", u16, MSG_MAGIC_END);
free(buffer);
free(h);
pthread_mutex_lock(&g_tunnel.lock);
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock);
return task;
}
// now we have the message // now we have the message
pthread_mutex_lock(&g_tunnel.lock); pthread_mutex_lock(&g_tunnel.lock);
process_client_message(&msg, rq->client); process_client_message(&msg, rq->client);