mirror of
https://github.com/lxsang/antd-tunnel-plugin
synced 2024-11-16 09:48:21 +01:00
working version
This commit is contained in:
parent
29c3cffbc5
commit
b09e44b67d
268
tunnel.c
268
tunnel.c
@ -269,8 +269,73 @@ static int msg_write(int fd, antd_tunnel_msg_t* msg)
|
|||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
static void write_msg_to_client(antd_tunnel_msg_t* msg, antd_client_t* client)
|
||||||
|
{
|
||||||
|
uint8_t* buffer;
|
||||||
|
int long_value = 0;
|
||||||
|
int offset = 0;
|
||||||
|
long_value = msg->header.size +
|
||||||
|
sizeof((int)MSG_MAGIC_BEGIN) +
|
||||||
|
sizeof(msg->header.type) +
|
||||||
|
sizeof(msg->header.channel_id) +
|
||||||
|
sizeof(msg->header.client_id) +
|
||||||
|
sizeof(msg->header.size) +
|
||||||
|
sizeof((int)MSG_MAGIC_END);
|
||||||
|
buffer = (uint8_t*) malloc(long_value);
|
||||||
|
if(buffer == NULL)
|
||||||
|
{
|
||||||
|
ERROR("unable to allocate memory for write");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// magic
|
||||||
|
long_value = (int) MSG_MAGIC_BEGIN;
|
||||||
|
(void)memcpy(buffer,&long_value,sizeof(long_value));
|
||||||
|
offset += sizeof(long_value);
|
||||||
|
// type
|
||||||
|
(void)memcpy(buffer+offset,&msg->header.type,sizeof(msg->header.type));
|
||||||
|
offset += sizeof(msg->header.type);
|
||||||
|
// channel id
|
||||||
|
(void)memcpy(buffer+offset,&msg->header.channel_id,sizeof(msg->header.channel_id));
|
||||||
|
offset += sizeof(msg->header.channel_id);
|
||||||
|
// client id
|
||||||
|
(void)memcpy(buffer+offset,&msg->header.client_id,sizeof(msg->header.client_id));
|
||||||
|
offset += sizeof(msg->header.client_id);
|
||||||
|
// payload length
|
||||||
|
(void)memcpy(buffer+offset,&msg->header.size,sizeof(msg->header.size));
|
||||||
|
offset += sizeof(msg->header.size);
|
||||||
|
// payload
|
||||||
|
(void)memcpy(buffer+offset,msg->data,msg->header.size);
|
||||||
|
offset += msg->header.size;
|
||||||
|
// magic end
|
||||||
|
long_value = (int) MSG_MAGIC_END;
|
||||||
|
(void)memcpy(buffer+offset,&long_value,sizeof(long_value));
|
||||||
|
offset += sizeof(long_value);
|
||||||
|
|
||||||
|
// write it to the websocket
|
||||||
|
ws_b(client,buffer, offset);
|
||||||
|
|
||||||
|
free(buffer);
|
||||||
|
|
||||||
|
}
|
||||||
|
static void unsubscribe(bst_node_t* node, void** argv, int argc)
|
||||||
|
{
|
||||||
|
// request client to unsubscribe
|
||||||
|
(void) argc;
|
||||||
|
antd_tunnel_channel_t* channel = (antd_tunnel_channel_t*) argv[0];
|
||||||
|
antd_tunnel_msg_t msg;
|
||||||
|
if(node->data != NULL)
|
||||||
|
{
|
||||||
|
msg.header.channel_id = simple_hash(channel->name);
|
||||||
|
msg.header.client_id = node->key;
|
||||||
|
msg.header.type = CHANNEL_UNSUBSCRIBE;
|
||||||
|
msg.header.size = 0;
|
||||||
|
msg.data = NULL;
|
||||||
|
write_msg_to_client(&msg,(antd_client_t*)node->data);
|
||||||
|
}
|
||||||
|
}
|
||||||
static void destroy_channel(antd_tunnel_channel_t* channel)
|
static void destroy_channel(antd_tunnel_channel_t* channel)
|
||||||
{
|
{
|
||||||
|
void* argc[1];
|
||||||
if(channel == NULL)
|
if(channel == NULL)
|
||||||
return;
|
return;
|
||||||
if(channel->sock != -1)
|
if(channel->sock != -1)
|
||||||
@ -278,7 +343,8 @@ static void destroy_channel(antd_tunnel_channel_t* channel)
|
|||||||
(void) close(channel->sock);
|
(void) close(channel->sock);
|
||||||
channel->sock = -1;
|
channel->sock = -1;
|
||||||
}
|
}
|
||||||
/** TODO: send message to all subcribers before close*/
|
argc[0] = (void*)channel;
|
||||||
|
bst_for_each(channel->subscribers,unsubscribe,argc, 1);
|
||||||
bst_free(channel->subscribers);
|
bst_free(channel->subscribers);
|
||||||
free(channel);
|
free(channel);
|
||||||
}
|
}
|
||||||
@ -351,7 +417,6 @@ static void channel_open(int fd, const char* name)
|
|||||||
static void channel_close(antd_tunnel_channel_t* channel)
|
static void channel_close(antd_tunnel_channel_t* channel)
|
||||||
{
|
{
|
||||||
antd_tunnel_msg_t msg;
|
antd_tunnel_msg_t msg;
|
||||||
|
|
||||||
msg.data = NULL;
|
msg.data = NULL;
|
||||||
msg.header.channel_id = 0;
|
msg.header.channel_id = 0;
|
||||||
msg.header.client_id = 0;
|
msg.header.client_id = 0;
|
||||||
@ -370,7 +435,6 @@ static void channel_close(antd_tunnel_channel_t* channel)
|
|||||||
LOG("Close channel: %s (%d)", channel->name, channel->sock);
|
LOG("Close channel: %s (%d)", channel->name, channel->sock);
|
||||||
destroy_channel(channel);
|
destroy_channel(channel);
|
||||||
}
|
}
|
||||||
//g_tunnel.channels = bst_delete(g_tunnel.channels, node->key);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void monitor_hotline(int listen_fd)
|
static void monitor_hotline(int listen_fd)
|
||||||
@ -430,66 +494,12 @@ static void monitor_hotline(int listen_fd)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void write_msg_to_client(antd_tunnel_msg_t* msg, antd_client_t* client)
|
|
||||||
{
|
|
||||||
uint8_t* buffer;
|
|
||||||
int long_value = 0;
|
|
||||||
int offset = 0;
|
|
||||||
long_value = msg->header.size +
|
|
||||||
sizeof((int)MSG_MAGIC_BEGIN) +
|
|
||||||
sizeof(msg->header.type) +
|
|
||||||
sizeof(msg->header.channel_id) +
|
|
||||||
sizeof(msg->header.client_id) +
|
|
||||||
sizeof(msg->header.size) +
|
|
||||||
msg->header.size +
|
|
||||||
sizeof((int)MSG_MAGIC_END);
|
|
||||||
buffer = (uint8_t*) malloc(long_value);
|
|
||||||
if(buffer == NULL)
|
|
||||||
{
|
|
||||||
ERROR("unable to allocate memory for write");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// magic
|
|
||||||
long_value = (int) MSG_MAGIC_BEGIN;
|
|
||||||
(void)memcpy(buffer,&long_value,sizeof(long_value));
|
|
||||||
offset += sizeof(long_value);
|
|
||||||
|
|
||||||
// type
|
|
||||||
(void)memcpy(buffer+offset,&msg->header.type,sizeof(msg->header.type));
|
|
||||||
offset += sizeof(msg->header.type);
|
|
||||||
|
|
||||||
// channel id
|
|
||||||
(void)memcpy(buffer+offset,&msg->header.channel_id,sizeof(msg->header.channel_id));
|
|
||||||
offset += sizeof(msg->header.channel_id);
|
|
||||||
|
|
||||||
// client id
|
|
||||||
(void)memcpy(buffer+offset,&msg->header.client_id,sizeof(msg->header.client_id));
|
|
||||||
offset += sizeof(msg->header.client_id);
|
|
||||||
|
|
||||||
// payload length
|
|
||||||
(void)memcpy(buffer+offset,&msg->header.size,sizeof(msg->header.size));
|
|
||||||
offset += sizeof(msg->header.size);
|
|
||||||
|
|
||||||
// payload
|
|
||||||
(void)memcpy(buffer+offset,&msg->data,sizeof(msg->header.size));
|
|
||||||
offset += msg->header.size;
|
|
||||||
|
|
||||||
// magic end
|
|
||||||
long_value = (int) MSG_MAGIC_END;
|
|
||||||
(void)memcpy(buffer,&long_value,sizeof(long_value));
|
|
||||||
offset += sizeof(long_value);
|
|
||||||
|
|
||||||
// write it to the websocket
|
|
||||||
ws_b(client,buffer, offset);
|
|
||||||
|
|
||||||
free(buffer);
|
|
||||||
|
|
||||||
}
|
|
||||||
static void handle_channel(bst_node_t* node, void** args, int argc)
|
static void handle_channel(bst_node_t* node, void** args, int argc)
|
||||||
{
|
{
|
||||||
antd_tunnel_msg_t msg;
|
antd_tunnel_msg_t msg;
|
||||||
(void) argc;
|
(void) argc;
|
||||||
fd_set* fd_in = (fd_set*) args[0];
|
fd_set* fd_in = (fd_set*) args[0];
|
||||||
|
list_t* channel_list = (list_t*) args[1];
|
||||||
antd_tunnel_channel_t* channel = (antd_tunnel_channel_t*) node->data;
|
antd_tunnel_channel_t* channel = (antd_tunnel_channel_t*) node->data;
|
||||||
bst_node_t * client;
|
bst_node_t * client;
|
||||||
antd_client_t* rq;
|
antd_client_t* rq;
|
||||||
@ -517,9 +527,14 @@ static void handle_channel(bst_node_t* node, void** args, int argc)
|
|||||||
case CHANNEL_OK:
|
case CHANNEL_OK:
|
||||||
case CHANNEL_ERROR:
|
case CHANNEL_ERROR:
|
||||||
case CHANNEL_DATA:
|
case CHANNEL_DATA:
|
||||||
|
case CHANNEL_UNSUBSCRIBE:
|
||||||
// forward message to the correct client in the channel
|
// forward message to the correct client in the channel
|
||||||
msg.header.channel_id = node->key;
|
msg.header.channel_id = node->key;
|
||||||
client = bst_find(channel->subscribers, msg.header.client_id);
|
client = bst_find(channel->subscribers, msg.header.client_id);
|
||||||
|
if(msg.header.type == CHANNEL_UNSUBSCRIBE)
|
||||||
|
{
|
||||||
|
channel->subscribers = bst_delete(channel->subscribers, msg.header.client_id);
|
||||||
|
}
|
||||||
if(client != NULL)
|
if(client != NULL)
|
||||||
{
|
{
|
||||||
rq = (antd_client_t*) client->data;
|
rq = (antd_client_t*) client->data;
|
||||||
@ -533,10 +548,12 @@ static void handle_channel(bst_node_t* node, void** args, int argc)
|
|||||||
ERROR("Unable to find client %d to write on channel %s", msg.header.client_id, channel->name);
|
ERROR("Unable to find client %d to write on channel %s", msg.header.client_id, channel->name);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case CHANNEL_CLOSE:
|
case CHANNEL_CLOSE:
|
||||||
// close the current channel
|
// close the current channel
|
||||||
channel_close(channel);
|
channel_close(channel);
|
||||||
node->data = NULL;
|
node->data = NULL;
|
||||||
|
list_put_ptr(channel_list, node);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
LOG("Message type %d is not supported in client-application communication", msg.header.type);
|
LOG("Message type %d is not supported in client-application communication", msg.header.type);
|
||||||
@ -567,6 +584,8 @@ static void* multiplex(void* data_p)
|
|||||||
struct timeval timeout;
|
struct timeval timeout;
|
||||||
int rc;
|
int rc;
|
||||||
void *args[2];
|
void *args[2];
|
||||||
|
list_t closed_channels;
|
||||||
|
item_t item;
|
||||||
antd_tunnel_t* tunnel_p = (antd_tunnel_t*) data_p;
|
antd_tunnel_t* tunnel_p = (antd_tunnel_t*) data_p;
|
||||||
while(status == 0)
|
while(status == 0)
|
||||||
{
|
{
|
||||||
@ -588,8 +607,9 @@ static void* multiplex(void* data_p)
|
|||||||
status = 1;
|
status = 1;
|
||||||
break;
|
break;
|
||||||
case 0:
|
case 0:
|
||||||
// time out
|
timeout.tv_sec = 0;
|
||||||
// sleep here
|
timeout.tv_usec = 500; // 5 ms
|
||||||
|
select(0, NULL, NULL, NULL, &timeout);
|
||||||
break;
|
break;
|
||||||
// we have data
|
// we have data
|
||||||
default:
|
default:
|
||||||
@ -599,8 +619,16 @@ static void* multiplex(void* data_p)
|
|||||||
monitor_hotline(tunnel_p->hotline);
|
monitor_hotline(tunnel_p->hotline);
|
||||||
}
|
}
|
||||||
pthread_mutex_lock(&tunnel_p->lock);
|
pthread_mutex_lock(&tunnel_p->lock);
|
||||||
|
closed_channels = list_init();
|
||||||
args[0] = (void*) &fd_in;
|
args[0] = (void*) &fd_in;
|
||||||
bst_for_each(tunnel_p->channels, handle_channel,args, 1);
|
args[1] = (void*) &closed_channels;
|
||||||
|
bst_for_each(tunnel_p->channels, handle_channel,args, 2);
|
||||||
|
list_for_each(item, closed_channels)
|
||||||
|
{
|
||||||
|
tunnel_p->channels = bst_delete(tunnel_p->channels, ((bst_node_t*)item->value.ptr)->key);
|
||||||
|
item->value.ptr = NULL;
|
||||||
|
}
|
||||||
|
list_free(&closed_channels);
|
||||||
pthread_mutex_unlock(&tunnel_p->lock);
|
pthread_mutex_unlock(&tunnel_p->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -699,9 +727,16 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
|
|||||||
write_msg_to_client(msg, client);
|
write_msg_to_client(msg, client);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if(msg->header.size > 0)
|
||||||
|
{
|
||||||
(void)memcpy(buff, msg->data, msg->header.size);
|
(void)memcpy(buff, msg->data, msg->header.size);
|
||||||
buff[msg->header.size] = '\0';
|
buff[msg->header.size] = '\0';
|
||||||
hash_val = simple_hash(buff);
|
hash_val = simple_hash(buff);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
hash_val = msg->header.channel_id;
|
||||||
|
}
|
||||||
node = bst_find(g_tunnel.channels, hash_val);
|
node = bst_find(g_tunnel.channels, hash_val);
|
||||||
if(node)
|
if(node)
|
||||||
{
|
{
|
||||||
@ -717,7 +752,10 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
|
|||||||
msg->header.channel_id = hash_val;
|
msg->header.channel_id = hash_val;
|
||||||
msg->header.size = sizeof(g_tunnel.id_allocator);
|
msg->header.size = sizeof(g_tunnel.id_allocator);
|
||||||
(void)memcpy(buff, &g_tunnel.id_allocator, sizeof(g_tunnel.id_allocator));
|
(void)memcpy(buff, &g_tunnel.id_allocator, sizeof(g_tunnel.id_allocator));
|
||||||
|
msg->data = (uint8_t*)buff;
|
||||||
write_msg_to_client(msg, client);
|
write_msg_to_client(msg, client);
|
||||||
|
msg->header.client_id = g_tunnel.id_allocator;
|
||||||
|
msg->header.type = CHANNEL_SUBSCRIBE;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -726,18 +764,28 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
|
|||||||
msg->header.channel_id = hash_val;
|
msg->header.channel_id = hash_val;
|
||||||
msg->header.size = 0;
|
msg->header.size = 0;
|
||||||
write_msg_to_client(msg, client);
|
write_msg_to_client(msg, client);
|
||||||
|
msg->header.type = CHANNEL_UNSUBSCRIBE;
|
||||||
|
}
|
||||||
|
// forward to publisher
|
||||||
|
|
||||||
|
if(msg_write(channel->sock, msg) == -1)
|
||||||
|
{
|
||||||
|
ERROR("Unable to forward subscribe/unsubscribe message to %s", channel->name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
msg->header.type = CHANNEL_ERROR;
|
|
||||||
(void) snprintf(buff, BUFFLEN, "Channel not found");
|
(void) snprintf(buff, BUFFLEN, "Channel not found");
|
||||||
msg->header.size = strlen(buff);
|
msg->header.size = strlen(buff);
|
||||||
msg->data = (uint8_t*)buff;
|
msg->data = (uint8_t*)buff;
|
||||||
ERROR("%s", buff);
|
ERROR("%s", buff);
|
||||||
|
if(msg->header.type == CHANNEL_SUBSCRIBE)
|
||||||
|
{
|
||||||
|
msg->header.type = CHANNEL_ERROR;
|
||||||
write_msg_to_client(msg, client);
|
write_msg_to_client(msg, client);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@ -745,18 +793,73 @@ static void process_client_message(antd_tunnel_msg_t* msg, antd_client_t* client
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void unsubscribe_notify_handle(bst_node_t* node, void** argv, int argc)
|
||||||
|
{
|
||||||
|
(void) argc;
|
||||||
|
antd_client_t* client = (antd_client_t*)argv[0];
|
||||||
|
antd_tunnel_channel_t* channel = (antd_tunnel_channel_t*)argv[1];
|
||||||
|
list_t* list = (list_t*)argv[2];
|
||||||
|
antd_tunnel_msg_t msg;
|
||||||
|
if((antd_client_t*)node->data == client)
|
||||||
|
{
|
||||||
|
if(channel != NULL)
|
||||||
|
{
|
||||||
|
msg.header.type = CHANNEL_UNSUBSCRIBE;
|
||||||
|
msg.header.channel_id = simple_hash(channel->name);
|
||||||
|
msg.header.client_id = node->key;
|
||||||
|
msg.header.size = 0;
|
||||||
|
msg.data = NULL;
|
||||||
|
if(msg_write(channel->sock, &msg) == -1)
|
||||||
|
{
|
||||||
|
ERROR("Unable to send unsubscribe notification of client %d to channel %s (%d)", node->key, channel->name, channel->sock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(list != NULL)
|
||||||
|
{
|
||||||
|
list_put_ptr(list, node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void unsubscribe_notify(bst_node_t* node, void** argv, int argc)
|
||||||
|
{
|
||||||
|
(void)argc;
|
||||||
|
void * pargv[3];
|
||||||
|
antd_client_t* client = (antd_client_t*) argv[0];
|
||||||
|
antd_tunnel_channel_t* channel = (antd_tunnel_channel_t*) node->data;
|
||||||
|
list_t list = list_init();
|
||||||
|
item_t item;
|
||||||
|
if(channel != NULL)
|
||||||
|
{
|
||||||
|
pargv[0] = (void*) client;
|
||||||
|
pargv[1] = (void*) channel;
|
||||||
|
pargv[2] = (void*) &list;
|
||||||
|
bst_for_each(channel->subscribers,unsubscribe_notify_handle,pargv, 3);
|
||||||
|
list_for_each(item, list)
|
||||||
|
{
|
||||||
|
channel->subscribers = bst_delete(channel->subscribers, ((bst_node_t*)item->value.ptr)->key);
|
||||||
|
item->value.ptr = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
list_free(&list);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
void *handle(void *rq_data)
|
void *handle(void *rq_data)
|
||||||
{
|
{
|
||||||
antd_request_t *rq = (antd_request_t *)rq_data;
|
antd_request_t *rq = (antd_request_t *)rq_data;
|
||||||
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, time(NULL));
|
antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, time(NULL));
|
||||||
ws_msg_header_t *h = NULL;
|
ws_msg_header_t *h = NULL;
|
||||||
antd_tunnel_msg_t msg;
|
antd_tunnel_msg_t msg;
|
||||||
struct timeval timeout;
|
|
||||||
uint8_t* buffer;
|
uint8_t* buffer;
|
||||||
|
struct timeval timeout;
|
||||||
|
int status;
|
||||||
fd_set fd_in;
|
fd_set fd_in;
|
||||||
int rc, long_value, offset;
|
int long_value, offset;
|
||||||
task->priority++;
|
task->priority++;
|
||||||
int cl_fd = ((antd_client_t *)rq->client)->sock;
|
|
||||||
|
void * argv[1];
|
||||||
if(g_tunnel.initialized == 0)
|
if(g_tunnel.initialized == 0)
|
||||||
{
|
{
|
||||||
ERROR("The tunnel plugin is not initialised correctly");
|
ERROR("The tunnel plugin is not initialised correctly");
|
||||||
@ -765,22 +868,22 @@ void *handle(void *rq_data)
|
|||||||
if (ws_enable(rq->request))
|
if (ws_enable(rq->request))
|
||||||
{
|
{
|
||||||
timeout.tv_sec = 0;
|
timeout.tv_sec = 0;
|
||||||
timeout.tv_usec = 500;
|
timeout.tv_usec = 500; // 5 ms
|
||||||
FD_ZERO(&fd_in);
|
FD_ZERO(&fd_in);
|
||||||
FD_SET(cl_fd, &fd_in);
|
FD_SET(((antd_client_t*)(rq->client))->sock, &fd_in);
|
||||||
rc = select(cl_fd + 1, &fd_in, NULL, NULL, &timeout);
|
status = select(((antd_client_t*)(rq->client))->sock + 1, &fd_in, NULL, NULL, &timeout);
|
||||||
switch (rc)
|
switch (status)
|
||||||
{
|
{
|
||||||
case -1:
|
case -1:
|
||||||
LOG("Error on select(): %s\n", strerror(errno));
|
LOG("Error %d on select()\n", errno);
|
||||||
ws_close(rq->client, 1011);
|
break;
|
||||||
/** TODO: remove all subscriber of this ws connection */
|
case 0:
|
||||||
return task;
|
timeout.tv_sec = 0;
|
||||||
case 0:
|
timeout.tv_usec = 500; // 5 ms
|
||||||
// time out
|
select(0, NULL, NULL, NULL, &timeout);
|
||||||
break;
|
break;
|
||||||
// we have data
|
|
||||||
default:
|
default:
|
||||||
|
argv[0] = (void*) rq->client;
|
||||||
pthread_mutex_lock(&g_tunnel.lock);
|
pthread_mutex_lock(&g_tunnel.lock);
|
||||||
h = ws_read_header(rq->client);
|
h = ws_read_header(rq->client);
|
||||||
pthread_mutex_unlock(&g_tunnel.lock);
|
pthread_mutex_unlock(&g_tunnel.lock);
|
||||||
@ -793,18 +896,18 @@ void *handle(void *rq_data)
|
|||||||
free(h);
|
free(h);
|
||||||
pthread_mutex_lock(&g_tunnel.lock);
|
pthread_mutex_lock(&g_tunnel.lock);
|
||||||
ws_close(rq->client, 1011);
|
ws_close(rq->client, 1011);
|
||||||
|
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
|
||||||
pthread_mutex_unlock(&g_tunnel.lock);
|
pthread_mutex_unlock(&g_tunnel.lock);
|
||||||
/** TODO: remove all subscriber of this ws connection */
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
if (h->opcode == WS_CLOSE)
|
if (h->opcode == WS_CLOSE)
|
||||||
{
|
{
|
||||||
LOG("Websocket: connection closed");
|
LOG("Websocket: connection closed");
|
||||||
pthread_mutex_lock(&g_tunnel.lock);
|
pthread_mutex_lock(&g_tunnel.lock);
|
||||||
ws_close(rq->client, 1011);
|
//ws_close(rq->client, 1011);
|
||||||
|
bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1);
|
||||||
pthread_mutex_unlock(&g_tunnel.lock);
|
pthread_mutex_unlock(&g_tunnel.lock);
|
||||||
free(h);
|
free(h);
|
||||||
/** TODO: remove all subscriber of this ws connection */
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
if (h->opcode == WS_BIN)
|
if (h->opcode == WS_BIN)
|
||||||
@ -815,7 +918,7 @@ void *handle(void *rq_data)
|
|||||||
if(buffer)
|
if(buffer)
|
||||||
{
|
{
|
||||||
pthread_mutex_lock(&g_tunnel.lock);
|
pthread_mutex_lock(&g_tunnel.lock);
|
||||||
rc = ws_read_data(rq->client,h, h->plen, buffer);
|
ws_read_data(rq->client,h, h->plen, buffer);
|
||||||
pthread_mutex_unlock(&g_tunnel.lock);
|
pthread_mutex_unlock(&g_tunnel.lock);
|
||||||
if(h->plen == 0)
|
if(h->plen == 0)
|
||||||
{
|
{
|
||||||
@ -871,9 +974,14 @@ void *handle(void *rq_data)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return task;
|
||||||
|
}
|
||||||
reschedule_task:
|
reschedule_task:
|
||||||
task->handle = handle;
|
task->handle = handle;
|
||||||
task->type = HEAVY;
|
task->type = HEAVY;
|
||||||
task->access_time = time(NULL);
|
task->access_time = time(NULL);
|
||||||
|
select(0, NULL, NULL, NULL, &timeout);
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user