1
0
mirror of https://github.com/lxsang/antd-tunnel-plugin synced 2024-11-16 09:48:21 +01:00

allow user identification in publisher

This commit is contained in:
Dany LE 2021-10-27 15:42:02 +02:00
parent 9406c4d33e
commit 6a4682c571
3 changed files with 106 additions and 92 deletions

View File

@ -1,5 +1,5 @@
# initialise autoconf and set up some basic information about the program were packaging # initialise autoconf and set up some basic information about the program were packaging
AC_INIT([tunnel], [0.1.0b], [xsang.le@gmail.com]) AC_INIT([tunnel], [0.1.1b], [xsang.le@gmail.com])
# Were going to use automake for this project # Were going to use automake for this project
# [subdir-objects] if needed # [subdir-objects] if needed

BIN
dist/tunnel-0.1.1b.tar.gz vendored Normal file

Binary file not shown.

196
tunnel.c
View File

@ -19,17 +19,18 @@
#include <fcntl.h> #include <fcntl.h>
#include <sys/time.h> #include <sys/time.h>
#define MAX_CHANNEL_NAME 64 #define MAX_CHANNEL_NAME 64
#define HOT_LINE_SOCKET "antd_hotline.sock" #define HOT_LINE_SOCKET "antd_hotline.sock"
#define KEY_CHAIN_FIFO "antunnel_keychain" #define KEY_CHAIN_FIFO "antunnel_keychain"
#define SOCK_DIR_NAME "channels" #define SOCK_DIR_NAME "channels"
#define COOKIE_NAME "sessionid" #define COOKIE_NAME "sessionid"
#define MAX_CHANNEL_ID 65535u #define MAX_CHANNEL_ID 65535u
#define KEY_LEN 40 #define KEY_LEN 40
#define MAX_SESSION_TIMEOUT (15u*60u) //15 min #define USER_LEN 64
#define PING_INTERVAL 10u // 10s #define MAX_SESSION_TIMEOUT (15u * 60u) //15 min
#define PROCESS_TIMEOUT 30000u //30 ms #define PING_INTERVAL 10u // 10s
#define PROCESS_TIMEOUT 30000u //30 ms
#define MAX_CHANNEL_PATH (sizeof(__plugin__.tmpdir) + strlen(SOCK_DIR_NAME) + strlen(HOT_LINE_SOCKET) + 2) #define MAX_CHANNEL_PATH (sizeof(__plugin__.tmpdir) + strlen(SOCK_DIR_NAME) + strlen(HOT_LINE_SOCKET) + 2)
@ -74,15 +75,18 @@ typedef struct
* |MSG TYPE(1)| CHANNEL ID (2)| CLIENT ID (2)| data(m) | * |MSG TYPE(1)| CHANNEL ID (2)| CLIENT ID (2)| data(m) |
*/ */
typedef struct { typedef struct
{
char hash[KEY_LEN + 1]; // sha1sum + terminal byte char hash[KEY_LEN + 1]; // sha1sum + terminal byte
char user[USER_LEN + 1];
time_t last_update; time_t last_update;
} antd_tunnel_key_t; } antd_tunnel_key_t;
typedef struct { typedef struct
{
pthread_mutex_t lock; pthread_mutex_t lock;
bst_node_t* channels; bst_node_t *channels;
bst_node_t* keychain; bst_node_t *keychain;
pthread_t tid; pthread_t tid;
int hotline; int hotline;
int key_fd; int key_fd;
@ -92,10 +96,10 @@ typedef struct {
static antd_tunnel_t g_tunnel; static antd_tunnel_t g_tunnel;
static int mk_keychain_fifo(const char* name, char * path) static int mk_keychain_fifo(const char *name, char *path)
{ {
// create the FIFO // create the FIFO
(void)snprintf(path, MAX_CHANNEL_PATH,"%s/%s/%s",__plugin__.tmpdir, SOCK_DIR_NAME, name); (void)snprintf(path, MAX_CHANNEL_PATH, "%s/%s/%s", __plugin__.tmpdir, SOCK_DIR_NAME, name);
(void)unlink(path); (void)unlink(path);
if (mkfifo(path, 0666) == -1) if (mkfifo(path, 0666) == -1)
{ {
@ -112,7 +116,7 @@ static int mk_keychain_fifo(const char* name, char * path)
return fifo_fd; return fifo_fd;
} }
static int mk_socket(const char* name, char* path) static int mk_socket(const char *name, char *path)
{ {
struct sockaddr_un address; struct sockaddr_un address;
address.sun_family = AF_UNIX; address.sun_family = AF_UNIX;
@ -477,13 +481,14 @@ static void channel_close(antd_tunnel_channel_t *channel)
} }
static void update_keychain(int listen_fd) static void update_keychain(int listen_fd)
{ {
antd_tunnel_key_t* key_p = (antd_tunnel_key_t*) malloc(sizeof(antd_tunnel_key_t)); antd_tunnel_key_t *key_p = (antd_tunnel_key_t *)malloc(sizeof(antd_tunnel_key_t));
if(key_p == NULL) if (key_p == NULL)
{ {
ERROR("Unable to allocate memory for key"); ERROR("Unable to allocate memory for key");
return; return;
} }
(void)memset(key_p->hash, 0, KEY_LEN + 1); (void)memset(key_p->hash, 0, KEY_LEN + 1);
(void)memset(key_p->user, 0, USER_LEN + 1);
int size; int size;
if ((size = read(listen_fd, key_p->hash, KEY_LEN)) == -1) if ((size = read(listen_fd, key_p->hash, KEY_LEN)) == -1)
{ {
@ -491,27 +496,33 @@ static void update_keychain(int listen_fd)
free(key_p); free(key_p);
return; return;
} }
if(size != KEY_LEN) if (size != KEY_LEN)
{ {
ERROR("Invalid key size %d", size); ERROR("Invalid key size %d", size);
free(key_p); free(key_p);
return; return;
} }
if ((size = read(listen_fd, key_p->user, USER_LEN)) == -1)
{
ERROR("Unable to read user from keychain FIFO: %s", strerror(errno));
free(key_p);
return;
}
// looking for key in the keychain // looking for key in the keychain
int hash_val = simple_hash(key_p->hash); int hash_val = simple_hash(key_p->hash);
pthread_mutex_lock(&g_tunnel.lock); pthread_mutex_lock(&g_tunnel.lock);
bst_node_t* node = bst_find(g_tunnel.keychain, hash_val); bst_node_t *node = bst_find(g_tunnel.keychain, hash_val);
if(node == NULL) if (node == NULL)
{ {
key_p->last_update = time(NULL); key_p->last_update = time(NULL);
g_tunnel.keychain = bst_insert(g_tunnel.keychain, hash_val, (void*) key_p); g_tunnel.keychain = bst_insert(g_tunnel.keychain, hash_val, (void *)key_p);
LOG("New key added to the keychain (%d)", hash_val); LOG("New key added to the keychain (%d) for user", hash_val, key_p->user);
} }
else else
{ {
antd_tunnel_key_t* existing_key = (antd_tunnel_key_t*)node->data; antd_tunnel_key_t *existing_key = (antd_tunnel_key_t *)node->data;
existing_key->last_update = time(NULL); existing_key->last_update = time(NULL);
LOG("Update existing key in the keychain"); LOG("Update existing key in the keychain for user %s", existing_key->user);
free(key_p); free(key_p);
} }
pthread_mutex_unlock(&g_tunnel.lock); pthread_mutex_unlock(&g_tunnel.lock);
@ -692,7 +703,7 @@ static void *multiplex(void *data_p)
FD_ZERO(&fd_in); FD_ZERO(&fd_in);
FD_SET(tunnel_p->hotline, &fd_in); FD_SET(tunnel_p->hotline, &fd_in);
FD_SET(tunnel_p->key_fd, &fd_in); FD_SET(tunnel_p->key_fd, &fd_in);
max_fdm = tunnel_p->hotline > tunnel_p->key_fd? tunnel_p->hotline: tunnel_p->key_fd; max_fdm = tunnel_p->hotline > tunnel_p->key_fd ? tunnel_p->hotline : tunnel_p->key_fd;
pthread_mutex_lock(&tunnel_p->lock); pthread_mutex_lock(&tunnel_p->lock);
args[0] = (void *)&fd_in; args[0] = (void *)&fd_in;
args[1] = (void *)&max_fdm; args[1] = (void *)&max_fdm;
@ -700,36 +711,36 @@ static void *multiplex(void *data_p)
pthread_mutex_unlock(&tunnel_p->lock); pthread_mutex_unlock(&tunnel_p->lock);
rc = select(max_fdm + 1, &fd_in, NULL, NULL, NULL); rc = select(max_fdm + 1, &fd_in, NULL, NULL, NULL);
switch (rc) switch (rc)
{ {
case -1: case -1:
LOG("Error %d on select()\n", errno); LOG("Error %d on select()\n", errno);
status = 1; status = 1;
break; break;
case 0: case 0:
break; break;
// we have data // we have data
default: default:
if(FD_ISSET(tunnel_p->hotline, &fd_in)) if (FD_ISSET(tunnel_p->hotline, &fd_in))
{ {
// LOG("Got new data on hotline"); // LOG("Got new data on hotline");
monitor_hotline(tunnel_p->hotline); monitor_hotline(tunnel_p->hotline);
} }
if(FD_ISSET(tunnel_p->key_fd, &fd_in)) if (FD_ISSET(tunnel_p->key_fd, &fd_in))
{ {
update_keychain(tunnel_p->key_fd); update_keychain(tunnel_p->key_fd);
} }
pthread_mutex_lock(&tunnel_p->lock); pthread_mutex_lock(&tunnel_p->lock);
closed_channels = list_init(); closed_channels = list_init();
args[0] = (void*) &fd_in; args[0] = (void *)&fd_in;
args[1] = (void*) &closed_channels; args[1] = (void *)&closed_channels;
bst_for_each(tunnel_p->channels, handle_channel,args, 2); bst_for_each(tunnel_p->channels, handle_channel, args, 2);
list_for_each(item, closed_channels) list_for_each(item, closed_channels)
{ {
tunnel_p->channels = bst_delete(tunnel_p->channels, ((bst_node_t*)item->value.ptr)->key); tunnel_p->channels = bst_delete(tunnel_p->channels, ((bst_node_t *)item->value.ptr)->key);
item->value.ptr = NULL; item->value.ptr = NULL;
} }
list_free(&closed_channels); list_free(&closed_channels);
pthread_mutex_unlock(&tunnel_p->lock); pthread_mutex_unlock(&tunnel_p->lock);
} }
} }
return NULL; return NULL;
@ -747,13 +758,13 @@ void init()
g_tunnel.initialized = 0; g_tunnel.initialized = 0;
g_tunnel.keychain = NULL; g_tunnel.keychain = NULL;
g_tunnel.key_fd = -1; g_tunnel.key_fd = -1;
if((g_tunnel.hotline = mk_socket(HOT_LINE_SOCKET, path)) == -1) if ((g_tunnel.hotline = mk_socket(HOT_LINE_SOCKET, path)) == -1)
{ {
ERROR("Unable to create hotline socket"); ERROR("Unable to create hotline socket");
destroy(); destroy();
return; return;
} }
if((g_tunnel.key_fd = mk_keychain_fifo(KEY_CHAIN_FIFO, path)) == -1) if ((g_tunnel.key_fd = mk_keychain_fifo(KEY_CHAIN_FIFO, path)) == -1)
{ {
ERROR("Unable to create keychain FIFO"); ERROR("Unable to create keychain FIFO");
destroy(); destroy();
@ -780,7 +791,7 @@ static void free_subscribers(bst_node_t *node, void **args, int argc)
void destroy() void destroy()
{ {
char path[MAX_CHANNEL_PATH]; char path[MAX_CHANNEL_PATH];
if(g_tunnel.initialized) if (g_tunnel.initialized)
{ {
pthread_mutex_lock(&g_tunnel.lock); pthread_mutex_lock(&g_tunnel.lock);
bst_for_each(g_tunnel.channels, free_subscribers, NULL, 0); bst_for_each(g_tunnel.channels, free_subscribers, NULL, 0);
@ -791,20 +802,20 @@ void destroy()
pthread_mutex_destroy(&g_tunnel.lock); pthread_mutex_destroy(&g_tunnel.lock);
LOG("Antd tunnel is destroyed"); LOG("Antd tunnel is destroyed");
} }
if(g_tunnel.hotline != -1) if (g_tunnel.hotline != -1)
{ {
(void) close(g_tunnel.hotline); (void)close(g_tunnel.hotline);
(void) snprintf(path, BUFFLEN, "%s/%s/%s", __plugin__.tmpdir, SOCK_DIR_NAME, HOT_LINE_SOCKET); (void)snprintf(path, BUFFLEN, "%s/%s/%s", __plugin__.tmpdir, SOCK_DIR_NAME, HOT_LINE_SOCKET);
(void) unlink(path); (void)unlink(path);
} }
if(g_tunnel.key_fd != -1) if (g_tunnel.key_fd != -1)
{ {
(void) close(g_tunnel.key_fd); (void)close(g_tunnel.key_fd);
(void) snprintf(path, BUFFLEN, "%s/%s/%s", __plugin__.tmpdir, SOCK_DIR_NAME, KEY_CHAIN_FIFO); (void)snprintf(path, BUFFLEN, "%s/%s/%s", __plugin__.tmpdir, SOCK_DIR_NAME, KEY_CHAIN_FIFO);
(void) unlink(path); (void)unlink(path);
} }
} }
static void process_client_message(antd_tunnel_msg_t *msg, antd_client_t *client) static void process_client_message(antd_tunnel_msg_t *msg, antd_client_t *client, antd_tunnel_key_t * key)
{ {
char buff[BUFFLEN + 1]; char buff[BUFFLEN + 1];
bst_node_t *node; bst_node_t *node;
@ -886,6 +897,9 @@ static void process_client_message(antd_tunnel_msg_t *msg, antd_client_t *client
ERROR("Unable to send subscribe OK message to client"); ERROR("Unable to send subscribe OK message to client");
} }
msg->header.client_id = g_tunnel.id_allocator; msg->header.client_id = g_tunnel.id_allocator;
msg->header.size = strlen(key->user) + 1;
(void)memset(buff,0, BUFFLEN + 1);
(void)memcpy(buff, key->user, msg->header.size - 1);
msg->header.type = CHANNEL_SUBSCRIBE; msg->header.type = CHANNEL_SUBSCRIBE;
} }
else else
@ -986,15 +1000,15 @@ static void keychain_validating(bst_node_t *node, void **argv, int argc)
{ {
(void)argc; (void)argc;
list_t *list = (list_t *)argv[0]; list_t *list = (list_t *)argv[0];
antd_tunnel_key_t* key_p = NULL; antd_tunnel_key_t *key_p = NULL;
if(node == NULL || node->data == NULL) if (node == NULL || node->data == NULL)
{ {
return; return;
} }
key_p = (antd_tunnel_key_t*) node->data; key_p = (antd_tunnel_key_t *)node->data;
if(difftime(time(NULL), key_p->last_update) > (double)MAX_SESSION_TIMEOUT) if (difftime(time(NULL), key_p->last_update) > (double)MAX_SESSION_TIMEOUT)
{ {
list_put_i(list,node->key); list_put_i(list, node->key);
} }
} }
@ -1010,9 +1024,9 @@ void *handle(void *rq_data)
int status; int status;
fd_set fd_in; fd_set fd_in;
int offset; int offset;
bst_node_t * node = NULL; bst_node_t *node = NULL;
antd_tunnel_key_t* key_p = NULL; antd_tunnel_key_t *key_p = NULL;
const char* ssid = NULL; const char *ssid = NULL;
dictionary_t cookie = NULL; dictionary_t cookie = NULL;
void *argv[1]; void *argv[1];
@ -1024,7 +1038,7 @@ void *handle(void *rq_data)
// update the keychain // update the keychain
list_t list = list_init(); list_t list = list_init();
argv[0] = (void*)&list; argv[0] = (void *)&list;
pthread_mutex_lock(&g_tunnel.lock); pthread_mutex_lock(&g_tunnel.lock);
bst_for_each(g_tunnel.keychain, keychain_validating, argv, 1); bst_for_each(g_tunnel.keychain, keychain_validating, argv, 1);
pthread_mutex_unlock(&g_tunnel.lock); pthread_mutex_unlock(&g_tunnel.lock);
@ -1043,19 +1057,19 @@ void *handle(void *rq_data)
argv[0] = (void *)rq->client; argv[0] = (void *)rq->client;
// verify if user is authorized // verify if user is authorized
cookie = dvalue(rq->request, "COOKIE"); cookie = dvalue(rq->request, "COOKIE");
if(cookie != NULL) if (cookie != NULL)
{ {
ssid = (const char*)dvalue(cookie, COOKIE_NAME); ssid = (const char *)dvalue(cookie, COOKIE_NAME);
} }
if(ssid == NULL) if (ssid == NULL)
{ {
return task; return task;
} }
pthread_mutex_lock(&g_tunnel.lock); pthread_mutex_lock(&g_tunnel.lock);
node = bst_find(g_tunnel.keychain, simple_hash(ssid)); node = bst_find(g_tunnel.keychain, simple_hash(ssid));
pthread_mutex_unlock(&g_tunnel.lock); pthread_mutex_unlock(&g_tunnel.lock);
if(node == NULL || node->data == NULL || strcmp(((antd_tunnel_key_t*)node->data)->hash,ssid) != 0) if (node == NULL || node->data == NULL || strcmp(((antd_tunnel_key_t *)node->data)->hash, ssid) != 0)
{ {
ERROR("User unauthorized, quit"); ERROR("User unauthorized, quit");
pthread_mutex_lock(&g_tunnel.lock); pthread_mutex_lock(&g_tunnel.lock);
@ -1063,7 +1077,7 @@ void *handle(void *rq_data)
pthread_mutex_unlock(&g_tunnel.lock); pthread_mutex_unlock(&g_tunnel.lock);
return task; return task;
} }
key_p = (antd_tunnel_key_t*) node->data; key_p = (antd_tunnel_key_t *)node->data;
pthread_mutex_lock(&g_tunnel.lock); pthread_mutex_lock(&g_tunnel.lock);
key_p->last_update = time(NULL); key_p->last_update = time(NULL);
pthread_mutex_unlock(&g_tunnel.lock); pthread_mutex_unlock(&g_tunnel.lock);
@ -1139,7 +1153,7 @@ void *handle(void *rq_data)
(void)memcpy(&msg.header.client_id, buffer + offset, sizeof(msg.header.client_id)); (void)memcpy(&msg.header.client_id, buffer + offset, sizeof(msg.header.client_id));
offset += sizeof(msg.header.client_id); offset += sizeof(msg.header.client_id);
if(offset > (int)ws_msg_len) if (offset > (int)ws_msg_len)
{ {
ERROR("Invalid message format"); ERROR("Invalid message format");
return task; return task;
@ -1150,13 +1164,13 @@ void *handle(void *rq_data)
msg.data = buffer + offset; msg.data = buffer + offset;
// now we have the message // now we have the message
pthread_mutex_lock(&g_tunnel.lock); pthread_mutex_lock(&g_tunnel.lock);
process_client_message(&msg, rq->client); process_client_message(&msg, rq->client, key_p);
pthread_mutex_unlock(&g_tunnel.lock); pthread_mutex_unlock(&g_tunnel.lock);
} }
free(buffer); free(buffer);
} }
} }
else if(h->opcode == WS_PONG) else if (h->opcode == WS_PONG)
{ {
buffer = (uint8_t *)malloc(h->plen + 1); buffer = (uint8_t *)malloc(h->plen + 1);
if (buffer) if (buffer)
@ -1205,7 +1219,7 @@ void *handle(void *rq_data)
ERROR("Unable to ping client, close the connection: %d", client->sock); ERROR("Unable to ping client, close the connection: %d", client->sock);
return task; return task;
}*/ }*/
if(ws_ping(client,"ANTD-TUNNEL",0) != 0) if (ws_ping(client, "ANTD-TUNNEL", 0) != 0)
{ {
// close the connection // close the connection
pthread_mutex_lock(&g_tunnel.lock); pthread_mutex_lock(&g_tunnel.lock);
@ -1223,6 +1237,6 @@ void *handle(void *rq_data)
} }
task->handle = handle; task->handle = handle;
task->access_time = time(NULL); task->access_time = time(NULL);
antd_task_bind_event(task,rq->client->sock,0, TASK_EVT_ON_WRITABLE| TASK_EVT_ON_READABLE); antd_task_bind_event(task, rq->client->sock, 0, TASK_EVT_ON_WRITABLE | TASK_EVT_ON_READABLE);
return task; return task;
} }