diff --git a/configure.ac b/configure.ac index a75925c..0e38ae9 100644 --- a/configure.ac +++ b/configure.ac @@ -1,5 +1,5 @@ # initialise autoconf and set up some basic information about the program we’re packaging -AC_INIT([tunnel], [0.1.3b], [xsang.le@gmail.com]) +AC_INIT([tunnel], [0.2.0], [xsang.le@gmail.com]) # We’re going to use automake for this project # [subdir-objects] if needed diff --git a/tunnel.c b/tunnel.c index d97ab85..19950f9 100644 --- a/tunnel.c +++ b/tunnel.c @@ -1,4 +1,3 @@ -#define PLUGIN_IMPLEMENT 1 #include #include #include @@ -6,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -101,8 +101,6 @@ typedef struct uint8_t initialized; } antd_tunnel_t; -static antd_tunnel_t g_tunnel; - static int mk_un_socket(antd_tunnel_hotline_t *line) { struct sockaddr_un address; @@ -421,7 +419,7 @@ static void destroy_channel(antd_tunnel_channel_t *channel) bst_free(channel->subscribers); free(channel); } -static void channel_open(int fd, const char *name) +static void channel_open(antd_tunnel_t* tunnel_conf, int fd, const char *name) { char buffer[BUFFLEN]; antd_tunnel_channel_t *channel = NULL; @@ -432,11 +430,11 @@ static void channel_open(int fd, const char *name) msg.header.channel_id = 0; msg.header.client_id = 0; // look if the channel is already opened - if (g_tunnel.channels != NULL) + if (tunnel_conf->channels != NULL) { - pthread_mutex_lock(&g_tunnel.lock); - node = bst_find(g_tunnel.channels, hash_val); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); + node = bst_find(tunnel_conf->channels, hash_val); + pthread_mutex_unlock(&tunnel_conf->lock); if (node != NULL) { channel = (antd_tunnel_channel_t *)node->data; @@ -482,9 +480,9 @@ static void channel_open(int fd, const char *name) ERROR("Unable to write message to hotline (%d)", fd); } // channel created - pthread_mutex_lock(&g_tunnel.lock); - g_tunnel.channels = bst_insert(g_tunnel.channels, hash_val, (void *)channel); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); + tunnel_conf->channels = bst_insert(tunnel_conf->channels, hash_val, (void *)channel); + pthread_mutex_unlock(&tunnel_conf->lock); if(strncmp(name, KEYCHAIN_CHANNEL, strlen(KEYCHAIN_CHANNEL)) == 0) { // subscribe to it as special client @@ -498,14 +496,14 @@ static void channel_open(int fd, const char *name) } } -static void channel_close(antd_tunnel_channel_t *channel) +static void channel_close(antd_tunnel_t* tunnel_conf, antd_tunnel_channel_t *channel) { antd_tunnel_msg_t msg; msg.data = NULL; msg.header.channel_id = 0; msg.header.client_id = 0; // look for the channel - if (g_tunnel.channels != NULL) + if (tunnel_conf->channels != NULL) { msg.header.channel_id = msg.header.channel_id; if (channel != NULL) @@ -521,7 +519,7 @@ static void channel_close(antd_tunnel_channel_t *channel) } } } -static void update_keychain(antd_tunnel_msg_t *msg) +static void update_keychain(antd_tunnel_t* tunnel_conf, antd_tunnel_msg_t *msg) { if(msg->header.size <= KEY_LEN) { @@ -546,12 +544,12 @@ static void update_keychain(antd_tunnel_msg_t *msg) // looking for key in the keychain int hash_val = simple_hash(key_p->hash); - bst_node_t *node = bst_find(g_tunnel.keychain, hash_val); + bst_node_t *node = bst_find(tunnel_conf->keychain, hash_val); if (node == NULL) { key_p->last_update = time(NULL); - g_tunnel.keychain = bst_insert(g_tunnel.keychain, hash_val, (void *)key_p); - LOG("New key added to the keychain (%d) for user", hash_val, key_p->user); + tunnel_conf->keychain = bst_insert(tunnel_conf->keychain, hash_val, (void *)key_p); + LOG("New key added to the keychain (%d) for user: %s", hash_val, key_p->user); } else { @@ -561,7 +559,7 @@ static void update_keychain(antd_tunnel_msg_t *msg) free(key_p); } } -static void monitor_hotline(int listen_fd) +static void monitor_hotline(antd_tunnel_t* tunnel_conf, int listen_fd) { char buff[MAX_CHANNEL_NAME]; antd_tunnel_msg_t msg; @@ -601,7 +599,7 @@ static void monitor_hotline(int listen_fd) (void)memcpy(buff, msg.data, msg.header.size); buff[msg.header.size] = '\0'; LOG("Open a new channel: %s (%d)", buff, fd); - channel_open(fd, buff); + channel_open(tunnel_conf,fd, buff); if (msg.data) free(msg.data); } @@ -627,11 +625,12 @@ static void handle_hotline(bst_node_t *node, void **args, int argc) { (void)argc; fd_set *fd_in = (fd_set *)args[0]; + antd_tunnel_t* tunnel_conf = (antd_tunnel_t*) args[2]; antd_tunnel_hotline_t *line = (antd_tunnel_hotline_t *)node->data; if (FD_ISSET(line->sock, fd_in)) { LOG("Got new data on hotline %d", line->sock); - monitor_hotline(line->sock); + monitor_hotline(tunnel_conf,line->sock); } } @@ -641,6 +640,7 @@ static void handle_channel(bst_node_t *node, void **args, int argc) (void)argc; fd_set *fd_in = (fd_set *)args[0]; list_t *channel_list = (list_t *)args[1]; + antd_tunnel_t* tunnel_conf = (antd_tunnel_t*) args[2]; antd_tunnel_channel_t *channel = (antd_tunnel_channel_t *)node->data; bst_node_t *client; antd_client_t *rq; @@ -678,7 +678,7 @@ static void handle_channel(bst_node_t *node, void **args, int argc) if(msg.header.type == CHANNEL_DATA) { // update keychain - update_keychain(&msg); + update_keychain( tunnel_conf, &msg); } else { @@ -725,7 +725,7 @@ static void handle_channel(bst_node_t *node, void **args, int argc) case CHANNEL_CLOSE: // close the current channel - channel_close(channel); + channel_close(tunnel_conf,channel); node->data = NULL; list_put_ptr(channel_list, node); break; @@ -774,7 +774,7 @@ static void *multiplex(void *data_p) fd_set fd_in; int status = 0; int rc; - void *args[2]; + void *args[3]; list_t closed_channels; item_t item; antd_tunnel_t *tunnel_p = (antd_tunnel_t *)data_p; @@ -785,6 +785,7 @@ static void *multiplex(void *data_p) pthread_mutex_lock(&tunnel_p->lock); args[0] = (void *)&fd_in; args[1] = (void *)&max_fdm; + args[2] = (void*) tunnel_p; bst_for_each(tunnel_p->channels, set_channel_sock_fd, args, 2); bst_for_each(tunnel_p->hotlines, set_hotline_sock_fd, args, 2); pthread_mutex_unlock(&tunnel_p->lock); @@ -801,11 +802,11 @@ static void *multiplex(void *data_p) default: args[0] = (void *)&fd_in; args[1] = (void *)&closed_channels; - bst_for_each(tunnel_p->hotlines, handle_hotline, args, 1); + bst_for_each(tunnel_p->hotlines, handle_hotline, args, 3); pthread_mutex_lock(&tunnel_p->lock); closed_channels = list_init(); - bst_for_each(tunnel_p->channels, handle_channel, args, 2); + bst_for_each(tunnel_p->channels, handle_channel, args, 3); list_for_each(item, closed_channels) { tunnel_p->channels = bst_delete(tunnel_p->channels, ((bst_node_t *)item->value.ptr)->key); @@ -818,22 +819,23 @@ static void *multiplex(void *data_p) return NULL; } -static int init_hotlines() +static int init_hotlines(antd_tunnel_t*tunnel_conf,antd_plugin_ctx_t* ctx) { char *tmp; regmatch_t regex_matches[3]; + dictionary_t config = antd_plugin_config(ctx); // read plugin configuration - if (!__plugin__.config) + if (!config) { - PLUGIN_PANIC("No plugin configuration found. Please specify it in server config file"); + PLUGIN_PANIC(ctx, "No plugin configuration found. Please specify it in server config file"); return -1; } - tmp = (char *)dvalue(__plugin__.config, "hotlines"); + tmp = (char *)dvalue(config, "hotlines"); // the hotlines can be multiple UNIX or TCP sockets separated by comma // e.g antd plugin config: hotlines = unix:/tmp/hotline.sock,192.168.1.71:5533 if (!tmp) { - PLUGIN_PANIC("No hotlines configuration found"); + PLUGIN_PANIC(ctx,"No hotlines configuration found"); return -1; } // split the configuration by , @@ -887,43 +889,20 @@ static int init_hotlines() } else { - g_tunnel.hotlines = bst_insert(g_tunnel.hotlines, line->sock, (void *)line); + tunnel_conf->hotlines = bst_insert(tunnel_conf->hotlines, line->sock, (void *)line); } } } list_free(&socket_list); // panic if not hotline - if (!g_tunnel.hotlines) + if (!tunnel_conf->hotlines) { - PLUGIN_PANIC("Unable to initialize tunnel hotlines"); + PLUGIN_PANIC(ctx, "Unable to initialize tunnel hotlines"); return -1; } return 0; } -void init() -{ - // initialise the channel - g_tunnel.hotlines = NULL; - g_tunnel.channels = NULL; - g_tunnel.id_allocator = 0; - g_tunnel.initialized = 0; - g_tunnel.keychain = NULL; - if (init_hotlines() != 0) - return; - // initialise the lock - (void)pthread_mutex_init(&g_tunnel.lock, NULL); - - // create the thread - if (pthread_create(&g_tunnel.tid, NULL, (void *(*)(void *))multiplex, (void *)&g_tunnel) != 0) - { - ERROR("pthread_create: cannot create tunnel multiplex thread: %s\n", strerror(errno)); - destroy(); - } - LOG("Tunnel plugin initialised"); - g_tunnel.initialized = 1; -} - static void free_subscribers(bst_node_t *node, void **args, int argc) { (void)argc; @@ -943,23 +922,68 @@ static void free_hotlines(bst_node_t *node, void **args, int argc) (void)free(node->data); node->data = NULL; } -void destroy() + +void destroy(antd_tunnel_t* tunnel_conf) { - if (g_tunnel.initialized) + if (tunnel_conf && tunnel_conf->initialized) { - pthread_mutex_lock(&g_tunnel.lock); - bst_for_each(g_tunnel.channels, free_subscribers, NULL, 0); - bst_for_each(g_tunnel.hotlines, free_hotlines, NULL, 0); - pthread_mutex_unlock(&g_tunnel.lock); - (void)pthread_join(g_tunnel.tid, NULL); - bst_free(g_tunnel.channels); - bst_free(g_tunnel.keychain); - bst_free(g_tunnel.hotlines); - pthread_mutex_destroy(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); + (void)pthread_cancel(tunnel_conf->tid); + (void)pthread_join(tunnel_conf->tid, NULL); + bst_for_each(tunnel_conf->channels, free_subscribers, NULL, 0); + bst_for_each(tunnel_conf->hotlines, free_hotlines, NULL, 0); + pthread_mutex_unlock(&tunnel_conf->lock); + bst_free(tunnel_conf->channels); + bst_free(tunnel_conf->keychain); + bst_free(tunnel_conf->hotlines); + pthread_mutex_destroy(&tunnel_conf->lock); LOG("Antd tunnel is destroyed"); } } -static void process_client_message(antd_tunnel_msg_t *msg, antd_client_t *client, antd_tunnel_key_t *key) + +void* create(antd_plugin_ctx_t* ctx) +{ + antd_tunnel_t* tunnel_conf = (antd_tunnel_t*) malloc(sizeof(antd_tunnel_t)); + if(!tunnel_conf) + { + PLUGIN_PANIC(ctx, "Unable to allocate memory for plugin context"); + return NULL; + } + // initialise the channel + tunnel_conf->hotlines = NULL; + tunnel_conf->channels = NULL; + tunnel_conf->id_allocator = 0; + tunnel_conf->initialized = 0; + tunnel_conf->keychain = NULL; + if (init_hotlines(tunnel_conf, ctx) != 0) + { + free(tunnel_conf); + return NULL; + } + // initialise the lock + (void)pthread_mutex_init(&tunnel_conf->lock, NULL); + + // create the thread + if (pthread_create(&tunnel_conf->tid, NULL, (void *(*)(void *))multiplex, (void *)tunnel_conf) != 0) + { + PLUGIN_PANIC(ctx, "pthread_create: cannot create tunnel multiplex thread: %s\n", strerror(errno)); + free(tunnel_conf); + return NULL; + } + int old_state; + (void)pthread_setcancelstate(tunnel_conf->tid, &old_state); + LOG("Tunnel plugin initialised"); + tunnel_conf->initialized = 1; + return tunnel_conf; +} + +void drop(antd_plugin_ctx_t* ctx) +{ + antd_tunnel_t * tunnel_conf = (antd_tunnel_t *) antd_plugin_data(ctx); + destroy(tunnel_conf); +} + +static void process_client_message(antd_tunnel_t* tunnel_conf, antd_tunnel_msg_t *msg, antd_client_t *client, antd_tunnel_key_t *key) { char buff[BUFFLEN + 1]; bst_node_t *node; @@ -973,7 +997,7 @@ static void process_client_message(antd_tunnel_msg_t *msg, antd_client_t *client case CHANNEL_ERROR: case CHANNEL_DATA: case CHANNEL_CTRL: - node = bst_find(g_tunnel.channels, msg->header.channel_id); + node = bst_find(tunnel_conf->channels, msg->header.channel_id); if (node) { channel = (antd_tunnel_channel_t *)node->data; @@ -1035,7 +1059,7 @@ static void process_client_message(antd_tunnel_msg_t *msg, antd_client_t *client { hash_val = msg->header.channel_id; } - node = bst_find(g_tunnel.channels, hash_val); + node = bst_find(tunnel_conf->channels, hash_val); if (node) { channel = (antd_tunnel_channel_t *)node->data; @@ -1043,20 +1067,20 @@ static void process_client_message(antd_tunnel_msg_t *msg, antd_client_t *client { if (msg->header.type == CHANNEL_SUBSCRIBE) { - g_tunnel.id_allocator++; - channel->subscribers = bst_insert(channel->subscribers, (int)g_tunnel.id_allocator, client); + tunnel_conf->id_allocator++; + channel->subscribers = bst_insert(channel->subscribers, (int)tunnel_conf->id_allocator, client); // sent ok to client msg->header.type = CHANNEL_OK; msg->header.channel_id = hash_val; - msg->header.size = sizeof(g_tunnel.id_allocator); - net16 = htons(g_tunnel.id_allocator); - (void)memcpy(buff, &net16, sizeof(g_tunnel.id_allocator)); + msg->header.size = sizeof(tunnel_conf->id_allocator); + net16 = htons(tunnel_conf->id_allocator); + (void)memcpy(buff, &net16, sizeof(tunnel_conf->id_allocator)); msg->data = (uint8_t *)buff; if (write_msg_to_client(msg, client) != 0) { ERROR("Unable to send subscribe OK message to client"); } - msg->header.client_id = g_tunnel.id_allocator; + msg->header.client_id = tunnel_conf->id_allocator; msg->header.size = strlen(key->user) + 1; (void)memset(buff, 0, BUFFLEN + 1); (void)memcpy(buff, key->user, msg->header.size - 1); @@ -1176,6 +1200,8 @@ void *handle(void *rq_data) { antd_request_t *rq = (antd_request_t *)rq_data; antd_client_t *client = ((antd_client_t *)(rq->client)); + antd_plugin_ctx_t* ctx = rq->context; + antd_tunnel_t* tunnel_conf = (antd_tunnel_t*) antd_plugin_data(ctx); antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, time(NULL)); ws_msg_header_t *h = NULL; antd_tunnel_msg_t msg; @@ -1190,25 +1216,25 @@ void *handle(void *rq_data) dictionary_t cookie = NULL; void *argv[1]; - if (g_tunnel.initialized == 0) + if (!tunnel_conf || tunnel_conf->initialized == 0) { - ERROR("The tunnel plugin is not initialised correctly"); + PLUGIN_PANIC(ctx,"The tunnel plugin is not initialised correctly"); return task; } // update the keychain list_t list = list_init(); argv[0] = (void *)&list; - pthread_mutex_lock(&g_tunnel.lock); - bst_for_each(g_tunnel.keychain, keychain_validating, argv, 1); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); + bst_for_each(tunnel_conf->keychain, keychain_validating, argv, 1); + pthread_mutex_unlock(&tunnel_conf->lock); item_t item; list_for_each(item, list) { - pthread_mutex_lock(&g_tunnel.lock); - g_tunnel.keychain = bst_delete(g_tunnel.keychain, item->value.i); + pthread_mutex_lock(&tunnel_conf->lock); + tunnel_conf->keychain = bst_delete(tunnel_conf->keychain, item->value.i); LOG("Delete invalid key (timeout) with hash %d", item->value.i); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_unlock(&tunnel_conf->lock); } list_free(&list); @@ -1226,21 +1252,21 @@ void *handle(void *rq_data) return task; } - pthread_mutex_lock(&g_tunnel.lock); - node = bst_find(g_tunnel.keychain, simple_hash(ssid)); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); + node = bst_find(tunnel_conf->keychain, simple_hash(ssid)); + pthread_mutex_unlock(&tunnel_conf->lock); if (node == NULL || node->data == NULL || strcmp(((antd_tunnel_key_t *)node->data)->hash, ssid) != 0) { ERROR("User unauthorized, quit"); - pthread_mutex_lock(&g_tunnel.lock); - bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); + bst_for_each(tunnel_conf->channels, unsubscribe_notify, argv, 1); + pthread_mutex_unlock(&tunnel_conf->lock); return task; } key_p = (antd_tunnel_key_t *)node->data; - pthread_mutex_lock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); key_p->last_update = time(NULL); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_unlock(&tunnel_conf->lock); // session is valid, continue timeout.tv_sec = 0; timeout.tv_usec = PROCESS_TIMEOUT; @@ -1251,9 +1277,9 @@ void *handle(void *rq_data) { case -1: ERROR("Error on poll(): %s", strerror(errno)); - pthread_mutex_lock(&g_tunnel.lock); - bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); + bst_for_each(tunnel_conf->channels, unsubscribe_notify, argv, 1); + pthread_mutex_unlock(&tunnel_conf->lock); return task; break; case 0: @@ -1265,15 +1291,15 @@ void *handle(void *rq_data) if (pfd.revents & (POLLERR | POLLHUP)) { ERROR("POLLHUP or POLLERR found"); - pthread_mutex_lock(&g_tunnel.lock); - bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); + bst_for_each(tunnel_conf->channels, unsubscribe_notify, argv, 1); + pthread_mutex_unlock(&tunnel_conf->lock); return task; break; } - pthread_mutex_lock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); h = ws_read_header(rq->client); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_unlock(&tunnel_conf->lock); if (h) { if (h->mask == 0) @@ -1281,18 +1307,18 @@ void *handle(void *rq_data) LOG("Data is not mask"); // kill the child process free(h); - pthread_mutex_lock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); ws_close(rq->client, 1011); - bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1); - pthread_mutex_unlock(&g_tunnel.lock); + bst_for_each(tunnel_conf->channels, unsubscribe_notify, argv, 1); + pthread_mutex_unlock(&tunnel_conf->lock); return task; } if (h->opcode == WS_CLOSE) { LOG("Websocket: connection closed"); - pthread_mutex_lock(&g_tunnel.lock); - bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); + bst_for_each(tunnel_conf->channels, unsubscribe_notify, argv, 1); + pthread_mutex_unlock(&tunnel_conf->lock); free(h); return task; } @@ -1304,9 +1330,9 @@ void *handle(void *rq_data) buffer = (uint8_t *)malloc(h->plen + 1); if (buffer) { - pthread_mutex_lock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); ws_read_data(rq->client, h, h->plen, buffer); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_unlock(&tunnel_conf->lock); if (h->plen == 0) { offset = 0; @@ -1334,9 +1360,9 @@ void *handle(void *rq_data) // data msg.data = buffer + offset; // now we have the message - pthread_mutex_lock(&g_tunnel.lock); - process_client_message(&msg, rq->client, key_p); - pthread_mutex_unlock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); + process_client_message(tunnel_conf,&msg, rq->client, key_p); + pthread_mutex_unlock(&tunnel_conf->lock); } free(buffer); } @@ -1354,10 +1380,10 @@ void *handle(void *rq_data) else { LOG("Websocket: Text data is not supported"); - pthread_mutex_lock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); // ws_close(rq->client, 1011); - bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1); - pthread_mutex_unlock(&g_tunnel.lock); + bst_for_each(tunnel_conf->channels, unsubscribe_notify, argv, 1); + pthread_mutex_unlock(&tunnel_conf->lock); free(h); return task; } @@ -1383,20 +1409,20 @@ void *handle(void *rq_data) if (write_msg_to_client(&msg, client) != 0) { // close the connection - pthread_mutex_lock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); //ws_close(rq->client, 1011); - bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1); - pthread_mutex_unlock(&g_tunnel.lock); + bst_for_each(tunnel_conf->channels, unsubscribe_notify, argv, 1); + pthread_mutex_unlock(&tunnel_conf->lock); ERROR("Unable to ping client, close the connection: %d", client->sock); return task; }*/ if (ws_ping(client, "ANTD-TUNNEL", 0) != 0) { // close the connection - pthread_mutex_lock(&g_tunnel.lock); + pthread_mutex_lock(&tunnel_conf->lock); // ws_close(rq->client, 1011); - bst_for_each(g_tunnel.channels, unsubscribe_notify, argv, 1); - pthread_mutex_unlock(&g_tunnel.lock); + bst_for_each(tunnel_conf->channels, unsubscribe_notify, argv, 1); + pthread_mutex_unlock(&tunnel_conf->lock); ERROR("Unable to ping client, close the connection: %d", client->sock); return task; }