diff --git a/antd-config.ini b/antd-config.ini index 8fe3f44..f9b2605 100644 --- a/antd-config.ini +++ b/antd-config.ini @@ -8,6 +8,7 @@ database=/opt/www/database/ ; tmp dir tmpdir=/opt/www/tmp/ ; max concurent connection +statistic_fifo=/opt/www/tmp/antd_stat maxcon=200 ; server backlocg backlog=5000 diff --git a/dist/antd-1.0.5b.tar.gz b/dist/antd-1.0.5b.tar.gz index 06c05b2..6682594 100644 Binary files a/dist/antd-1.0.5b.tar.gz and b/dist/antd-1.0.5b.tar.gz differ diff --git a/http_server.c b/http_server.c index 0ad37f4..d62671c 100644 --- a/http_server.c +++ b/http_server.c @@ -4,10 +4,15 @@ #include #include #include +#include +#include #ifdef USE_OPENSSL #include #include +#include +#else +#include "sha1.h" #endif #include "http_server.h" @@ -62,6 +67,8 @@ void destroy_config() list_free(&server_config.gzip_types); if (server_config.mimes) freedict(server_config.mimes); + if (server_config.stat_fifo_path) + free(server_config.stat_fifo_path); if (server_config.ports) { chain_t it; @@ -103,16 +110,22 @@ static int config_handler(void *conf, const char *section, const char *name, } else if (MATCH("SERVER", "plugins_ext")) { + if(pconfig->plugins_ext) + free(pconfig->plugins_ext); pconfig->plugins_ext = strdup(value); } else if (MATCH("SERVER", "database")) { + if(pconfig->db_path) + free(pconfig->db_path); pconfig->db_path = strdup(value); if (stat(pconfig->db_path, &st) == -1) mkdirp(pconfig->db_path, 0755); } else if (MATCH("SERVER", "tmpdir")) { + if(pconfig->tmpdir) + free(pconfig->tmpdir); pconfig->tmpdir = strdup(value); if (stat(pconfig->tmpdir, &st) == -1) mkdirp(pconfig->tmpdir, 0755); @@ -121,6 +134,12 @@ static int config_handler(void *conf, const char *section, const char *name, removeAll(pconfig->tmpdir, 0); } } + else if (MATCH("SERVER", "statistic_fifo")) + { + if(pconfig->stat_fifo_path) + free(pconfig->stat_fifo_path); + pconfig->stat_fifo_path = strdup(value); + } else if (MATCH("SERVER", "max_upload_size")) { pconfig->max_upload_size = atoi(value); @@ -150,14 +169,20 @@ static int config_handler(void *conf, const char *section, const char *name, #ifdef USE_OPENSSL else if (MATCH("SERVER", "ssl.cert")) { + if(pconfig->sslcert) + free(pconfig->sslcert); pconfig->sslcert = strdup(value); } else if (MATCH("SERVER", "ssl.key")) { + if(pconfig->sslkey) + free(pconfig->sslkey); pconfig->sslkey = strdup(value); } else if (MATCH("SERVER", "ssl.cipher")) { + if(pconfig->ssl_cipher) + free(pconfig->ssl_cipher); pconfig->ssl_cipher = strdup(value); } #endif @@ -220,11 +245,12 @@ static int config_handler(void *conf, const char *section, const char *name, void load_config(const char *file) { server_config.ports = dict(); - server_config.plugins_dir = "plugins/"; - server_config.plugins_ext = ".dylib"; - server_config.db_path = "databases/"; + server_config.plugins_dir = strdup("plugins/"); + server_config.plugins_ext = strdup(".dylib"); + server_config.db_path = strdup("databases/"); //server_config.htdocs = "htdocs/"; - server_config.tmpdir = "tmp/"; + server_config.tmpdir = strdup("tmp/"); + server_config.stat_fifo_path = strdup("/var/run/antd_stat"); server_config.n_workers = 4; server_config.backlog = 1000; server_config.handlers = dict(); @@ -233,8 +259,8 @@ void load_config(const char *file) server_config.connection = 0; server_config.mimes = dict(); server_config.enable_ssl = 0; - server_config.sslcert = "cert.pem"; - server_config.sslkey = "key.pem"; + server_config.sslcert = strdup("cert.pem"); + server_config.sslkey = strdup("key.pem"); server_config.ssl_cipher = NULL; server_config.gzip_enable = 0; server_config.gzip_types = NULL; @@ -297,7 +323,7 @@ void *accept_request(void *data) // perform the ssl handshake if enabled #ifdef USE_OPENSSL int ret = -1, stat; - if (client->ssl && client->status == 0) + if (client->ssl && client->state == ANTD_CLIENT_ACCEPT) { //LOG("Atttempt %d\n", client->attempt); if (SSL_accept((SSL *)client->ssl) == -1) @@ -318,7 +344,7 @@ void *accept_request(void *data) return task; } } - client->status = 1; + client->state = ANTD_CLIENT_HANDSHAKE; task->handle = accept_request; //LOG("Handshake finish for %d\n", client->sock); return task; @@ -334,6 +360,7 @@ void *accept_request(void *data) #endif //LOG("Ready for reading %d\n", client->sock); //server_config.connection++; + client->state = ANTD_CLIENT_PROTO_CHECK; read_buf(rq->client, buf, sizeof(buf)); line = buf; LOG("Request (%d): %s", rq->client->sock, line); @@ -381,6 +408,7 @@ void *resolve_request(void *data) char *newurl = NULL; char *rqp = NULL; char *oldrqp = NULL; + rq->client->state = ANTD_CLIENT_RESOLVE_REQUEST; htdocs(rq, path); strcat(path, url); //LOG("Path is : %s", path); @@ -585,7 +613,7 @@ void *serve_file(void *data) antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); char *path = (char *)dvalue(rq->request, "ABS_RESOURCE_PATH"); char *mime_type = (char *)dvalue(rq->request, "RESOURCE_MIME"); - + rq->client->state = ANTD_CLIENT_SERVE_FILE; struct stat st; int s = stat(path, &st); @@ -721,6 +749,7 @@ char *apply_rules(dictionary_t rules, const char *host, char *url) void *decode_request_header(void *data) { antd_request_t *rq = (antd_request_t *)data; + rq->client->state = ANTD_CLIENT_HEADER_DECODE; dictionary_t cookie = NULL; char *line; char *token; @@ -737,7 +766,7 @@ void *decode_request_header(void *data) // first real all header // this for check if web socket is enabled - while ((( ret = read_buf(rq->client, buf, sizeof(buf))) > 0) && strcmp("\r\n", buf)) + while (((ret = read_buf(rq->client, buf, sizeof(buf))) > 0) && strcmp("\r\n", buf)) { header_size += ret; line = buf; @@ -763,7 +792,7 @@ void *decode_request_header(void *data) { host = strdup(line); } - if(header_size > HEADER_MAX_SIZE) + if (header_size > HEADER_MAX_SIZE) { antd_error(rq->client, 413, "Payload Too Large"); ERROR("Header size too large (%d): %d vs %d", rq->client->sock, header_size, HEADER_MAX_SIZE); @@ -1256,7 +1285,7 @@ void *execute_plugin(void *data, const char *pname) antd_request_t *rq = (antd_request_t *)data; antd_task_t *task = antd_create_task(NULL, (void *)rq, NULL, rq->client->last_io); //LOG("Plugin name '%s'", pname); - + rq->client->state = ANTD_CLIENT_PLUGIN_EXEC; //load the plugin if ((plugin = plugin_lookup((char *)pname)) == NULL) { diff --git a/httpd.c b/httpd.c index 226c56d..911ad72 100644 --- a/httpd.c +++ b/httpd.c @@ -4,6 +4,7 @@ #include #include #endif +#include #include #include #include @@ -13,8 +14,11 @@ #include "plugin_manager.h" #include "lib/utils.h" +#define SEND_STAT(fd, buff, ret, ...) \ + snprintf(buff, BUFFLEN, ##__VA_ARGS__); \ + ret = write(fd, buff, strlen(buff)); -static antd_scheduler_t scheduler; +static antd_scheduler_t scheduler; #ifdef USE_OPENSSL @@ -25,34 +29,33 @@ static antd_scheduler_t scheduler; static int ssl_session_ctx_id = 1; SSL_CTX *ctx; static void init_openssl() -{ - SSL_load_error_strings(); - OpenSSL_add_ssl_algorithms(); +{ + SSL_load_error_strings(); + OpenSSL_add_ssl_algorithms(); } - static SSL_CTX *create_context() { - const SSL_METHOD *method; - SSL_CTX *ctx; + const SSL_METHOD *method; + SSL_CTX *ctx; - method = SSLv23_server_method(); + method = SSLv23_server_method(); - ctx = SSL_CTX_new(method); - if (!ctx) { + ctx = SSL_CTX_new(method); + if (!ctx) + { ERROR("Unable to create SSL context"); ERR_print_errors_fp(stderr); exit(EXIT_FAILURE); - } + } - return ctx; + return ctx; } #if OPENSSL_VERSION_NUMBER >= 0x10002000L static unsigned char antd_protocols[] = { //TODO: add support to HTTP/2 protocol: 2,'h', '2', - 8, 'h', 't', 't', 'p', '/', '1', '.', '1' -}; -static int alpn_advertise_protos_cb(SSL *ssl, const unsigned char **out, unsigned int *outlen,void *arg) + 8, 'h', 't', 't', 'p', '/', '1', '.', '1'}; +static int alpn_advertise_protos_cb(SSL *ssl, const unsigned char **out, unsigned int *outlen, void *arg) { UNUSED(ssl); UNUSED(arg); @@ -64,7 +67,7 @@ static int alpn_select_cb(SSL *ssl, const unsigned char **out, unsigned char *ou { UNUSED(ssl); UNUSED(arg); - if(SSL_select_next_proto((unsigned char **)out, outlen,antd_protocols,sizeof(antd_protocols),in, inlen) == OPENSSL_NPN_NEGOTIATED) + if (SSL_select_next_proto((unsigned char **)out, outlen, antd_protocols, sizeof(antd_protocols), in, inlen) == OPENSSL_NPN_NEGOTIATED) { return SSL_TLSEXT_ERR_OK; } @@ -78,62 +81,65 @@ static int alpn_select_cb(SSL *ssl, const unsigned char **out, unsigned char *ou static void configure_context(SSL_CTX *ctx) { #if defined(SSL_CTX_set_ecdh_auto) - SSL_CTX_set_ecdh_auto(ctx, 1); + SSL_CTX_set_ecdh_auto(ctx, 1); #else - SSL_CTX_set_tmp_ecdh(ctx, EC_KEY_new_by_curve_name(NID_X9_62_prime256v1)); + SSL_CTX_set_tmp_ecdh(ctx, EC_KEY_new_by_curve_name(NID_X9_62_prime256v1)); #endif - //SSL_CTX_set_ecdh_auto(ctx, 1); + //SSL_CTX_set_ecdh_auto(ctx, 1); /* Set some options and the session id. * SSL_OP_NO_SSLv2: SSLv2 is insecure, disable it. * SSL_OP_NO_TICKET: We don't want TLS tickets used because this is an SSL server caching example. * It should be fine to use tickets in addition to server side caching. */ - SSL_CTX_set_options(ctx, SSL_OP_NO_TLSv1|SSL_OP_NO_TLSv1_1|SSL_OP_NO_SSLv2|SSL_OP_NO_TICKET); - SSL_CTX_set_session_id_context(ctx, (void *)&ssl_session_ctx_id, sizeof(ssl_session_ctx_id)); - // set the cipher suit - config_t * cnf = config(); - const char* suit = cnf->ssl_cipher?cnf->ssl_cipher:CIPHER_SUIT; + SSL_CTX_set_options(ctx, SSL_OP_NO_TLSv1 | SSL_OP_NO_TLSv1_1 | SSL_OP_NO_SSLv2 | SSL_OP_NO_TICKET); + SSL_CTX_set_session_id_context(ctx, (void *)&ssl_session_ctx_id, sizeof(ssl_session_ctx_id)); + // set the cipher suit + config_t *cnf = config(); + const char *suit = cnf->ssl_cipher ? cnf->ssl_cipher : CIPHER_SUIT; LOG("Cirpher suit used: %s", suit); - if (SSL_CTX_set_cipher_list(ctx, suit) != 1) - { + if (SSL_CTX_set_cipher_list(ctx, suit) != 1) + { ERROR("Fail to set ssl cirpher suit: %s", suit); - ERR_print_errors_fp(stderr); - exit(EXIT_FAILURE); - } - /* Set the key and cert */ + ERR_print_errors_fp(stderr); + exit(EXIT_FAILURE); + } + /* Set the key and cert */ /* use the full chain bundle of certificate */ - //if (SSL_CTX_use_certificate_file(ctx, server_config->sslcert, SSL_FILETYPE_PEM) <= 0) { - if (SSL_CTX_use_certificate_chain_file(ctx, cnf->sslcert) <= 0) { + //if (SSL_CTX_use_certificate_file(ctx, server_config->sslcert, SSL_FILETYPE_PEM) <= 0) { + if (SSL_CTX_use_certificate_chain_file(ctx, cnf->sslcert) <= 0) + { ERROR("Fail to read SSL certificate chain file: %s", cnf->sslcert); - ERR_print_errors_fp(stderr); + ERR_print_errors_fp(stderr); exit(EXIT_FAILURE); - } + } - if (SSL_CTX_use_PrivateKey_file(ctx, cnf->sslkey, SSL_FILETYPE_PEM) <= 0 ) { + if (SSL_CTX_use_PrivateKey_file(ctx, cnf->sslkey, SSL_FILETYPE_PEM) <= 0) + { ERROR("Fail to read SSL private file: %s", cnf->sslkey); - ERR_print_errors_fp(stderr); + ERR_print_errors_fp(stderr); exit(EXIT_FAILURE); - } - if (!SSL_CTX_check_private_key(ctx)) { - ERROR("Failed to validate SSL certificate"); - ERR_print_errors_fp(stderr); + } + if (!SSL_CTX_check_private_key(ctx)) + { + ERROR("Failed to validate SSL certificate"); + ERR_print_errors_fp(stderr); exit(EXIT_FAILURE); - } + } #if OPENSSL_VERSION_NUMBER >= 0x10002000L - SSL_CTX_set_alpn_select_cb(ctx,alpn_select_cb, NULL); - SSL_CTX_set_next_protos_advertised_cb(ctx,alpn_advertise_protos_cb,NULL); + SSL_CTX_set_alpn_select_cb(ctx, alpn_select_cb, NULL); + SSL_CTX_set_next_protos_advertised_cb(ctx, alpn_advertise_protos_cb, NULL); #endif } #endif - -static void stop_serve(int dummy) { +static void stop_serve(int dummy) +{ UNUSED(dummy); // close log server - closelog (); + closelog(); sigset_t mask; - sigemptyset(&mask); + sigemptyset(&mask); //Blocks the SIG_IGN signal (by adding SIG_IGN to newMask) sigaddset(&mask, SIGINT); sigaddset(&mask, SIGPIPE); @@ -153,23 +159,23 @@ static void stop_serve(int dummy) { // DEPRECATED: ERR_remove_state(0); ERR_free_strings(); #endif - destroy_config(); - sigprocmask(SIG_UNBLOCK, &mask, NULL); + destroy_config(); + sigprocmask(SIG_UNBLOCK, &mask, NULL); } -static void* antd_monitor(port_config_t* pcnf) +static void *antd_monitor(port_config_t *pcnf) { - antd_task_t* task = NULL; + antd_task_t *task = NULL; struct timeval timeout; int client_sock = -1; struct sockaddr_in client_name; socklen_t client_name_len = sizeof(client_name); - char* client_ip = NULL; - config_t* conf = config(); + char *client_ip = NULL; + config_t *conf = config(); LOG("Listening on port %d", pcnf->port); while (scheduler.status) { - if(conf->connection > conf->maxcon) + if (conf->connection > conf->maxcon) { //ERROR("Reach max connection %d", conf->connection); timeout.tv_sec = 0; @@ -177,30 +183,30 @@ static void* antd_monitor(port_config_t* pcnf) select(0, NULL, NULL, NULL, &timeout); continue; } - if(pcnf->sock > 0) + if (pcnf->sock > 0) { - client_sock = accept(pcnf->sock,(struct sockaddr *)&client_name,&client_name_len); + client_sock = accept(pcnf->sock, (struct sockaddr *)&client_name, &client_name_len); if (client_sock > 0) { // just dump the scheduler when we have a connection - antd_client_t* client = (antd_client_t*)malloc(sizeof(antd_client_t)); - antd_request_t* request = (antd_request_t*)malloc(sizeof(*request)); + antd_client_t *client = (antd_client_t *)malloc(sizeof(antd_client_t)); + antd_request_t *request = (antd_request_t *)malloc(sizeof(*request)); request->client = client; request->request = dict(); client->zstream = NULL; client->z_level = ANTD_CNONE; - + dictionary_t xheader = dict(); dput(request->request, "REQUEST_HEADER", xheader); dput(request->request, "REQUEST_DATA", dict()); dput(xheader, "SERVER_PORT", (void *)__s("%d", pcnf->port)); - dput(xheader, "SERVER_WWW_ROOT", (void*)strdup(pcnf->htdocs)); + dput(xheader, "SERVER_WWW_ROOT", (void *)strdup(pcnf->htdocs)); /* get the remote IP */ if (client_name.sin_family == AF_INET) { - client_ip = inet_ntoa(client_name.sin_addr); + client_ip = inet_ntoa(client_name.sin_addr); LOG("Connect to client IP: %s on port:%d (%d)", client_ip, pcnf->port, client_sock); // ip address dput(xheader, "REMOTE_ADDR", (void *)strdup(client_ip)); @@ -209,20 +215,22 @@ static void* antd_monitor(port_config_t* pcnf) // set timeout to socket set_nonblock(client_sock); - + client->sock = client_sock; time(&client->last_io); client->ssl = NULL; + client->state = ANTD_CLIENT_ACCEPT; + client->z_status = 0; #ifdef USE_OPENSSL - client->status = 0; - if(pcnf->usessl == 1) + if (pcnf->usessl == 1) { - client->ssl = (void*)SSL_new(ctx); - if(!client->ssl) continue; - SSL_set_fd((SSL*)client->ssl, client->sock); + client->ssl = (void *)SSL_new(ctx); + if (!client->ssl) + continue; + SSL_set_fd((SSL *)client->ssl, client->sock); // this can be used in the protocol select callback to // set the protocol selected by the server - if(!SSL_set_ex_data((SSL*)client->ssl, client->sock, client)) + if (!SSL_set_ex_data((SSL *)client->ssl, client->sock, client)) { ERROR("Cannot set ex data to ssl client:%d", client->sock); } @@ -238,7 +246,7 @@ static void* antd_monitor(port_config_t* pcnf) conf->connection++; pthread_mutex_unlock(&scheduler.scheduler_lock); // create callback for the server - task = antd_create_task(accept_request,(void*)request, finish_request, client->last_io); + task = antd_create_task(accept_request, (void *)request, finish_request, client->last_io); //task->type = LIGHT; antd_add_task(&scheduler, task); } @@ -247,44 +255,94 @@ static void* antd_monitor(port_config_t* pcnf) return NULL; } -int main(int argc, char* argv[]) +static void client_statistic(int fd, void *user_data) +{ + antd_request_t *request = (antd_request_t *)user_data; + chain_t it, it1; + dictionary_t tmp; + int ret; + char buff[BUFFLEN]; + if (request == NULL) + { + SEND_STAT(fd, buff, ret, "Data is null\n"); + return; + } + // send client general infomation + SEND_STAT(fd, buff, ret, "Client id: %d\n", request->client->sock); + SEND_STAT(fd, buff, ret, "Last IO: %lu\n", (unsigned long)request->client->last_io); + SEND_STAT(fd, buff, ret, "Current state: %d\n", request->client->state); + SEND_STAT(fd, buff, ret, "z_level: %d\n", request->client->z_level); + if (request->client->ssl) + { + SEND_STAT(fd, buff, ret, "SSL is enabled\n"); + } + // send client request detail + if (request->request) + { + for_each_assoc(it, request->request) + { + if (strcmp(it->key, "REQUEST_HEADER") == 0 || + strcmp(it->key, "REQUEST_DATA") == 0 || + strcmp(it->key, "COOKIE") == 0) + { + tmp = (dictionary_t)it->value; + if (tmp) + { + for_each_assoc(it1, tmp) + { + SEND_STAT(fd, buff, ret, "%s: %s\n", it1->key, (char *)it1->value); + } + } + } + else + { + SEND_STAT(fd, buff, ret, "%s: %s\n", it->key, (char *)it->value); + } + } + } + UNUSED(ret); +} + +int main(int argc, char *argv[]) { pthread_t monitor_th; // startup port chain_t it; - port_config_t * pcnf; + port_config_t *pcnf; int nlisten = 0; -// load the config first - if(argc==1) + // load the config first + if (argc == 1) load_config(CONFIG_FILE); else load_config(argv[1]); - // ignore the broken PIPE error when writing + // ignore the broken PIPE error when writing //or reading to/from a closed socked connection signal(SIGPIPE, SIG_IGN); signal(SIGABRT, SIG_IGN); signal(SIGINT, stop_serve); - config_t* conf = config(); + config_t *conf = config(); // start syslog - setlogmask (LOG_UPTO (LOG_NOTICE)); - openlog (SERVER_NAME, LOG_CONS | LOG_PID | LOG_NDELAY, LOG_DAEMON); + setlogmask(LOG_UPTO(LOG_NOTICE)); + openlog(SERVER_NAME, LOG_CONS | LOG_PID | LOG_NDELAY, LOG_DAEMON); #ifdef USE_OPENSSL - if( conf->enable_ssl == 1 ) + if (conf->enable_ssl == 1) { init_openssl(); - ctx = create_context(); + ctx = create_context(); - configure_context(ctx); + configure_context(ctx); } - + #endif // enable scheduler // default to 4 workers scheduler.validate_data = 1; scheduler.destroy_data = finish_request; - if(antd_scheduler_init(&scheduler, conf->n_workers) == -1) + strncpy(scheduler.stat_fifo, conf->stat_fifo_path, MAX_FIFO_NAME_SZ); + scheduler.stat_data_cb = client_statistic; + if (antd_scheduler_init(&scheduler, conf->n_workers) == -1) { ERROR("Unable to initialise scheduler. Exit"); stop_serve(0); @@ -292,13 +350,13 @@ int main(int argc, char* argv[]) } for_each_assoc(it, conf->ports) { - pcnf = (port_config_t*)it->value; - if(pcnf) + pcnf = (port_config_t *)it->value; + if (pcnf) { pcnf->sock = startup(&pcnf->port); - if(pcnf->sock>0) + if (pcnf->sock > 0) { - if (pthread_create(&monitor_th, NULL,(void *(*)(void *))antd_monitor, (void*)pcnf) != 0) + if (pthread_create(&monitor_th, NULL, (void *(*)(void *))antd_monitor, (void *)pcnf) != 0) { ERROR("pthread_create: cannot create worker"); stop_serve(0); @@ -317,7 +375,7 @@ int main(int argc, char* argv[]) } } } - if(nlisten == 0) + if (nlisten == 0) { ERROR("No port is listenned, quit!!"); stop_serve(0); @@ -325,5 +383,5 @@ int main(int argc, char* argv[]) } antd_wait(&scheduler); stop_serve(0); - return(0); + return (0); } diff --git a/lib/handle.c b/lib/handle.c index 6d67d2e..446f507 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -6,6 +6,7 @@ #include #include #include +#include #include //open ssl #ifdef USE_OPENSSL @@ -295,7 +296,7 @@ void antd_send_header(void *cl, antd_response_header_t *res) } else { - client->status = Z_NO_FLUSH; + client->z_status = Z_NO_FLUSH; dput(res->header, "Content-Encoding", strdup("gzip")); } } @@ -309,7 +310,7 @@ void antd_send_header(void *cl, antd_response_header_t *res) } else { - client->status = Z_NO_FLUSH; + client->z_status = Z_NO_FLUSH; dput(res->header, "Content-Encoding", strdup("deflate")); } } @@ -384,7 +385,7 @@ int antd_send(void *src, const void *data_in, int len_in) { zstream->avail_out = BUFFLEN; zstream->next_out = buf; - if (deflate(zstream, source->status) == Z_STREAM_ERROR) + if (deflate(zstream, source->z_status) == Z_STREAM_ERROR) { source->z_level = current_zlevel; data = NULL; @@ -729,9 +730,9 @@ int antd_close(void *src) //TODO: send finish data to the socket before quit if (source->zstream) { - if (source->status == Z_NO_FLUSH && source->z_level != ANTD_CNONE) + if (source->z_status == Z_NO_FLUSH && source->z_level != ANTD_CNONE) { - source->status = Z_FINISH; + source->z_status = Z_FINISH; antd_send(source, "", 0); } deflateEnd(source->zstream); diff --git a/lib/handle.h b/lib/handle.h index 41445fc..5af3ed2 100644 --- a/lib/handle.h +++ b/lib/handle.h @@ -6,44 +6,59 @@ #include "list.h" #include "dictionary.h" - #define SERVER_NAME "Antd" -#define IS_POST(method) (strcmp(method,"POST")== 0) -#define IS_GET(method) (strcmp(method,"GET")== 0) -#define R_STR(d,k) ((char*)dvalue(d,k)) -#define R_INT(d,k) (atoi(dvalue(d,k))) -#define R_FLOAT(d,k) ((double)atof(dvalue(d,k))) -#define R_PTR(d,k) (dvalue(d,k)) +#define IS_POST(method) (strcmp(method, "POST") == 0) +#define IS_GET(method) (strcmp(method, "GET") == 0) +#define R_STR(d, k) ((char *)dvalue(d, k)) +#define R_INT(d, k) (atoi(dvalue(d, k))) +#define R_FLOAT(d, k) ((double)atof(dvalue(d, k))) +#define R_PTR(d, k) (dvalue(d, k)) #define __RESULT__ "{\"result\":%d,\"msg\":\"%s\"}" -#define FORM_URL_ENCODE "application/x-www-form-urlencoded" -#define FORM_MULTI_PART "multipart/form-data" +#define FORM_URL_ENCODE "application/x-www-form-urlencoded" +#define FORM_MULTI_PART "multipart/form-data" #define MAX_IO_WAIT_TIME 5 // second +#define ANTD_CLIENT_ACCEPT 0x0 +#define ANTD_CLIENT_HANDSHAKE 0x1 +#define ANTD_CLIENT_HEADER_DECODE 0x2 +#define ANTD_CLIENT_PLUGIN_EXEC 0x3 +#define ANTD_CLIENT_PROTO_CHECK 0x4 +#define ANTD_CLIENT_RESOLVE_REQUEST 0x5 +#define ANTD_CLIENT_SERVE_FILE 0x6 -typedef enum {ANTD_CGZ, ANTD_CDEFL, ANTD_CNONE} antd_compress_t; +typedef enum +{ + ANTD_CGZ, + ANTD_CDEFL, + ANTD_CNONE +} antd_compress_t; //extern config_t server_config; -typedef struct { +typedef struct +{ unsigned int port; int usessl; - char* htdocs; + char *htdocs; int sock; dictionary_t rules; } port_config_t; -typedef struct{ +typedef struct +{ int sock; - void* ssl; - int status; + void *ssl; + int state; time_t last_io; // compress antd_compress_t z_level; - void* zstream; + void *zstream; + int z_status; } antd_client_t; -typedef struct { - antd_client_t* client; +typedef struct +{ + antd_client_t *client; dictionary_t request; } antd_request_t; @@ -55,15 +70,15 @@ typedef struct } antd_response_header_t; - - -typedef struct { - //int port; - char *plugins_dir; +typedef struct +{ + //int port; + char *plugins_dir; char *plugins_ext; char *db_path; //char* htdocs; - char* tmpdir; + char *tmpdir; + char *stat_fifo_path; dictionary_t handlers; int backlog; int maxcon; @@ -72,51 +87,51 @@ typedef struct { int max_upload_size; // ssl int enable_ssl; - char* sslcert; - char* sslkey; - char* ssl_cipher; + char *sslcert; + char *sslkey; + char *ssl_cipher; int gzip_enable; list_t gzip_types; dictionary_t mimes; dictionary_t ports; -// #endif -}config_t; + // #endif +} config_t; -typedef struct { - char name[128]; +typedef struct +{ + char name[128]; char dbpath[512]; char tmpdir[512]; char pdir[512]; int raw_body; } plugin_header_t; +int __attribute__((weak)) require_plugin(const char *); +void __attribute__((weak)) htdocs(antd_request_t *rq, char *dest); +void __attribute__((weak)) dbdir(char *dest); +void __attribute__((weak)) tmpdir(char *dest); +void __attribute__((weak)) plugindir(char *dest); -int __attribute__((weak)) require_plugin(const char*); -void __attribute__((weak)) htdocs(antd_request_t* rq, char* dest); -void __attribute__((weak)) dbdir(char* dest); -void __attribute__((weak)) tmpdir(char* dest); -void __attribute__((weak)) plugindir(char* dest); - -int __attribute__((weak)) compressable(char* ctype); +int __attribute__((weak)) compressable(char *ctype); void set_nonblock(int socket); //void set_block(int socket); -void antd_send_header(void*,antd_response_header_t*); -const char* get_status_str(int stat); -int __t(void*, const char*,...); -int __b(void*, const unsigned char*, int); -int __f(void*, const char*); +void antd_send_header(void *, antd_response_header_t *); +const char *get_status_str(int stat); +int __t(void *, const char *, ...); +int __b(void *, const unsigned char *, int); +int __f(void *, const char *); -int upload(const char*, const char*); +int upload(const char *, const char *); /*Default function for plugin*/ -void antd_error(void* client, int status, const char* msg); +void antd_error(void *client, int status, const char *msg); int ws_enable(dictionary_t); -int read_buf(void* sock,char* buf,int i); -int antd_send( void *source, const void* data, int len); -int antd_recv( void *source, void* data, int len); -int antd_close(void* source); +int read_buf(void *sock, char *buf, int i); +int antd_send(void *source, const void *data, int len); +int antd_recv(void *source, void *data, int len); +int antd_close(void *source); void destroy_request(void *data); #endif diff --git a/lib/scheduler.c b/lib/scheduler.c index b4af579..a2be0f6 100644 --- a/lib/scheduler.c +++ b/lib/scheduler.c @@ -1,18 +1,33 @@ #include #include #include +#include +#include +#include +#include +#include #include "scheduler.h" #include "utils.h" -static void enqueue(antd_task_queue_t* q, antd_task_t* task) +static void set_nonblock(int fd) +{ + int flags; + flags = fcntl(fd, F_GETFL, 0); + if(flags == -1) + { + ERROR("Unable to set flag"); + } + fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} +static void enqueue(antd_task_queue_t *q, antd_task_t *task) { antd_task_item_t it = *q; - while(it && it->next != NULL) + while (it && it->next != NULL) it = it->next; antd_task_item_t taski = (antd_task_item_t)malloc(sizeof *taski); taski->task = task; taski->next = NULL; - if(!it) // first task + if (!it) // first task { *q = taski; } @@ -22,19 +37,20 @@ static void enqueue(antd_task_queue_t* q, antd_task_t* task) } } - -static void stop(antd_scheduler_t* scheduler) +static void stop(antd_scheduler_t *scheduler) { scheduler->status = 0; // unlock all idle workers if any for (int i = 0; i < scheduler->n_workers; i++) sem_post(scheduler->worker_sem); - if(scheduler->scheduler_sem) + if (scheduler->scheduler_sem) sem_post(scheduler->scheduler_sem); for (int i = 0; i < scheduler->n_workers; i++) - if(scheduler->workers[i].id != -1) + if (scheduler->workers[i].id != -1) pthread_join(scheduler->workers[i].tid, NULL); - if(scheduler->workers) free(scheduler->workers); + if (scheduler->workers) + free(scheduler->workers); + (void)pthread_join(scheduler->stat_tid, NULL); // destroy all the mutex pthread_mutex_destroy(&scheduler->scheduler_lock); pthread_mutex_destroy(&scheduler->worker_lock); @@ -45,10 +61,10 @@ static void stop(antd_scheduler_t* scheduler) sem_close(scheduler->worker_sem); } -static antd_task_item_t dequeue(antd_task_queue_t* q) +static antd_task_item_t dequeue(antd_task_queue_t *q) { antd_task_item_t it = *q; - if(it) + if (it) { *q = it->next; it->next = NULL; @@ -56,24 +72,23 @@ static antd_task_item_t dequeue(antd_task_queue_t* q) return it; } - -antd_callback_t* callback_of( void* (*callback)(void*) ) +antd_callback_t *callback_of(void *(*callback)(void *)) { - antd_callback_t* cb = NULL; - if(callback) + antd_callback_t *cb = NULL; + if (callback) { - cb = (antd_callback_t*)malloc(sizeof *cb); + cb = (antd_callback_t *)malloc(sizeof *cb); cb->handle = callback; cb->next = NULL; } return cb; } -static void free_callback(antd_callback_t* cb) +static void free_callback(antd_callback_t *cb) { - antd_callback_t* it = cb; - antd_callback_t* curr; - while(it) + antd_callback_t *it = cb; + antd_callback_t *curr; + while (it) { curr = it; it = it->next; @@ -81,25 +96,26 @@ static void free_callback(antd_callback_t* cb) } } -static void enqueue_callback(antd_callback_t* cb, antd_callback_t* el) +static void enqueue_callback(antd_callback_t *cb, antd_callback_t *el) { - antd_callback_t* it = cb; - while(it && it->next != NULL) + antd_callback_t *it = cb; + while (it && it->next != NULL) it = it->next; - if(!it) return; // this should not happend + if (!it) + return; // this should not happend it->next = el; } -static void execute_callback(antd_scheduler_t* scheduler, antd_task_t* task) +static void execute_callback(antd_scheduler_t *scheduler, antd_task_t *task) { - antd_callback_t* cb = task->callback; - if(cb) + antd_callback_t *cb = task->callback; + if (cb) { // call the first come call back task->handle = cb->handle; task->callback = task->callback->next; task->priority = task->priority + 1; - if(task->priority > N_PRIORITY - 1) + if (task->priority > N_PRIORITY - 1) { task->priority = N_PRIORITY - 1; } @@ -116,21 +132,23 @@ static void destroy_queue(antd_task_queue_t q) { antd_task_item_t it, curr; it = q; - while(it) + while (it) { // first free the task - if(it->task && it->task->callback) free_callback(it->task->callback); - if(it->task) free(it->task); + if (it->task && it->task->callback) + free_callback(it->task->callback); + if (it->task) + free(it->task); // then free the placeholder curr = it; it = it->next; free(curr); } } -static void* work(antd_worker_t* worker) +static void *work(antd_worker_t *worker) { - antd_scheduler_t* scheduler = (antd_scheduler_t*) worker->manager; - while(scheduler->status) + antd_scheduler_t *scheduler = (antd_scheduler_t *)worker->manager; + while (scheduler->status) { antd_task_item_t it; pthread_mutex_lock(&scheduler->worker_lock); @@ -139,7 +157,7 @@ static void* work(antd_worker_t* worker) // execute the task //LOG("task executed by worker %d\n", worker->pid); // no task to execute, just sleep wait - if(!it) + if (!it) { //LOG("Worker %d goes to idle state\n", worker->id); sem_wait(scheduler->worker_sem); @@ -149,7 +167,86 @@ static void* work(antd_worker_t* worker) //LOG("task executed by worker %d\n", worker->id); antd_execute_task(scheduler, it); } - + } + return NULL; +} + +static void *statistic(antd_scheduler_t *scheduler) +{ + struct pollfd fdp; + int ret; + char buffer[MAX_FIFO_NAME_SZ]; + antd_task_item_t it; + while (scheduler->status) + { + if (scheduler->stat_fd == -1) + { + scheduler->stat_fd = open(scheduler->stat_fifo, O_RDWR); + if (scheduler->stat_fd == -1) + { + ERROR("Unable to open FIFO %s: %s", scheduler->stat_fifo, strerror(errno)); + return NULL; + } + } + fdp.fd = scheduler->stat_fd; + fdp.events = POLLOUT; + // poll the fd in blocking mode + ret = poll(&fdp, 1, -1); + + if (ret > 0 && (fdp.revents & POLLOUT) && scheduler->pending_task > 0) + { + pthread_mutex_lock(&scheduler->scheduler_lock); + // write statistic data + snprintf(buffer, MAX_FIFO_NAME_SZ, "Pending task: %d. Detail:\n", scheduler->pending_task); + ret = write(scheduler->stat_fd, buffer, strlen(buffer)); + + for (int i = 0; i < N_PRIORITY; i++) + { + snprintf(buffer, MAX_FIFO_NAME_SZ, "#### PRIORITY: %d\n", i); + ret = write(scheduler->stat_fd, buffer, strlen(buffer)); + + it = scheduler->task_queue[i]; + while (it) + { + // send statistic on task data + snprintf(buffer, MAX_FIFO_NAME_SZ, "---- Task created at: %lu ----\n", it->task->stamp); + ret = write(scheduler->stat_fd, buffer, strlen(buffer)); + + // send statistic on task data + snprintf(buffer, MAX_FIFO_NAME_SZ, "Access time: %lu\nn", (unsigned long)it->task->access_time); + ret = write(scheduler->stat_fd, buffer, strlen(buffer)); + + snprintf(buffer, MAX_FIFO_NAME_SZ, "Current time: %lu\n", (unsigned long)time(NULL)); + ret = write(scheduler->stat_fd, buffer, strlen(buffer)); + + snprintf(buffer, MAX_FIFO_NAME_SZ, "Task type: %d\n", it->task->type); + ret = write(scheduler->stat_fd, buffer, strlen(buffer)); + + if (it->task->handle) + { + snprintf(buffer, MAX_FIFO_NAME_SZ, "Has handle: yes\n"); + ret = write(scheduler->stat_fd, buffer, strlen(buffer)); + } + + if (it->task->callback) + { + snprintf(buffer, MAX_FIFO_NAME_SZ, "Has callback: yes\n"); + ret = write(scheduler->stat_fd, buffer, strlen(buffer)); + } + + // now print all task data statistic + if (scheduler->stat_data_cb) + { + scheduler->stat_data_cb(scheduler->stat_fd, it->task->data); + } + it = it->next; + } + } + pthread_mutex_unlock(&scheduler->scheduler_lock); + ret = close(scheduler->stat_fd); + scheduler->stat_fd = -1; + usleep(5000); + } } return NULL; } @@ -159,14 +256,17 @@ static void* work(antd_worker_t* worker) init the main scheduler */ -int antd_scheduler_init(antd_scheduler_t* scheduler, int n) +int antd_scheduler_init(antd_scheduler_t *scheduler, int n) { scheduler->n_workers = n; scheduler->status = 1; scheduler->workers_queue = NULL; - scheduler->pending_task = 0 ; + scheduler->pending_task = 0; scheduler->validate_data = 0; scheduler->destroy_data = NULL; + scheduler->stat_fd = -1; + //scheduler->stat_data_cb = NULL; + //memset(scheduler->stat_fifo, 0, MAX_FIFO_NAME_SZ); // init semaphore scheduler->scheduler_sem = sem_open("scheduler", O_CREAT, 0600, 0); if (scheduler->scheduler_sem == SEM_FAILED) @@ -180,25 +280,26 @@ int antd_scheduler_init(antd_scheduler_t* scheduler, int n) ERROR("Cannot open semaphore for workers"); return -1; } - // init lock - pthread_mutex_init(&scheduler->scheduler_lock,NULL); + // init lock + pthread_mutex_init(&scheduler->scheduler_lock, NULL); pthread_mutex_init(&scheduler->worker_lock, NULL); pthread_mutex_init(&scheduler->pending_lock, NULL); - for(int i = 0; i < N_PRIORITY; i++) scheduler->task_queue[i] = NULL; + for (int i = 0; i < N_PRIORITY; i++) + scheduler->task_queue[i] = NULL; // create scheduler.workers - if(n > 0) + if (n > 0) { - scheduler->workers = (antd_worker_t*)malloc(n*(sizeof(antd_worker_t))); - if(!scheduler->workers) + scheduler->workers = (antd_worker_t *)malloc(n * (sizeof(antd_worker_t))); + if (!scheduler->workers) { ERROR("Cannot allocate memory for worker"); return -1; } - for(int i = 0; i < scheduler->n_workers;i++) + for (int i = 0; i < scheduler->n_workers; i++) { scheduler->workers[i].id = -1; - scheduler->workers[i].manager = (void*)scheduler; - if (pthread_create(&scheduler->workers[i].tid, NULL,(void *(*)(void *))work, (void*)&scheduler->workers[i]) != 0) + scheduler->workers[i].manager = (void *)scheduler; + if (pthread_create(&scheduler->workers[i].tid, NULL, (void *(*)(void *))work, (void *)&scheduler->workers[i]) != 0) { ERROR("pthread_create: cannot create worker: %s", strerror(errno)); return -1; @@ -209,6 +310,34 @@ int antd_scheduler_init(antd_scheduler_t* scheduler, int n) } } } + // delete the fifo if any + if (scheduler->stat_fifo[0] != '\0') + { + LOG("Statistic fifo at: %s", scheduler->stat_fifo); + (void)remove(scheduler->stat_fifo); + // create the fifo file + if (mkfifo(scheduler->stat_fifo, 0666) == -1) + { + ERROR("Unable to create statictis FIFO %s: %s", scheduler->stat_fifo, strerror(errno)); + } + else + { + // open the fifo in write mode + scheduler->stat_fd = open(scheduler->stat_fifo, O_RDWR); + if (scheduler->stat_fd == -1) + { + ERROR("Unable to open FIFO %s: %s", scheduler->stat_fifo, strerror(errno)); + } + else + { + set_nonblock(scheduler->stat_fd); + if (pthread_create(&scheduler->stat_tid, NULL, (void *(*)(void *))statistic, scheduler) != 0) + { + ERROR("pthread_create: cannot create statistic thread: %s", strerror(errno)); + } + } + } + } LOG("Antd scheduler initialized with %d worker", scheduler->n_workers); return 0; } @@ -216,12 +345,12 @@ int antd_scheduler_init(antd_scheduler_t* scheduler, int n) destroy all pending task pthread_mutex_lock(&scheduler.queue_lock); */ -void antd_scheduler_destroy(antd_scheduler_t* scheduler) +void antd_scheduler_destroy(antd_scheduler_t *scheduler) { // free all the chains stop(scheduler); LOG("Destroy remaining queue"); - for(int i=0; i < N_PRIORITY; i++) + for (int i = 0; i < N_PRIORITY; i++) { destroy_queue(scheduler->task_queue[i]); } @@ -231,9 +360,9 @@ void antd_scheduler_destroy(antd_scheduler_t* scheduler) /* create a task */ -antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callback)(void*), time_t atime) +antd_task_t *antd_create_task(void *(*handle)(void *), void *data, void *(*callback)(void *), time_t atime) { - antd_task_t* task = (antd_task_t*)malloc(sizeof *task); + antd_task_t *task = (antd_task_t *)malloc(sizeof *task); task->stamp = (unsigned long)time(NULL); task->data = data; task->handle = handle; @@ -248,10 +377,10 @@ antd_task_t* antd_create_task(void* (*handle)(void*), void *data, void* (*callba /* scheduling a task */ -void antd_add_task(antd_scheduler_t* scheduler, antd_task_t* task) +void antd_add_task(antd_scheduler_t *scheduler, antd_task_t *task) { // check if task is exist - int prio = task->priority>N_PRIORITY-1?N_PRIORITY-1:task->priority; + int prio = task->priority > N_PRIORITY - 1 ? N_PRIORITY - 1 : task->priority; //LOG("Prio is %d\n", prio); pthread_mutex_lock(&scheduler->scheduler_lock); enqueue(&scheduler->task_queue[prio], task); @@ -263,15 +392,14 @@ void antd_add_task(antd_scheduler_t* scheduler, antd_task_t* task) sem_post(scheduler->scheduler_sem); } - -void antd_execute_task(antd_scheduler_t* scheduler, antd_task_item_t taski) +void antd_execute_task(antd_scheduler_t *scheduler, antd_task_item_t taski) { - if(!taski) + if (!taski) return; // execute the task void *ret = (*(taski->task->handle))(taski->task->data); // check the return data if it is a new task - if(!ret) + if (!ret) { // call the first callback execute_callback(scheduler, taski->task); @@ -279,10 +407,10 @@ void antd_execute_task(antd_scheduler_t* scheduler, antd_task_item_t taski) } else { - antd_task_t* rtask = (antd_task_t*) ret; - if(taski->task->callback) - { - if(rtask->callback) + antd_task_t *rtask = (antd_task_t *)ret; + if (taski->task->callback) + { + if (rtask->callback) { enqueue_callback(rtask->callback, taski->task->callback); } @@ -291,7 +419,7 @@ void antd_execute_task(antd_scheduler_t* scheduler, antd_task_item_t taski) rtask->callback = taski->task->callback; } } - if(!rtask->handle) + if (!rtask->handle) { // call the first callback execute_callback(scheduler, rtask); @@ -301,7 +429,7 @@ void antd_execute_task(antd_scheduler_t* scheduler, antd_task_item_t taski) else { rtask->priority = taski->task->priority + 1; - if(rtask->priority > N_PRIORITY - 1) + if (rtask->priority > N_PRIORITY - 1) { rtask->priority = N_PRIORITY - 1; } @@ -310,43 +438,43 @@ void antd_execute_task(antd_scheduler_t* scheduler, antd_task_item_t taski) free(taski); } } - pthread_mutex_lock(&scheduler->pending_lock); - scheduler->pending_task--; - pthread_mutex_unlock(&scheduler->pending_lock); } -int antd_scheduler_busy(antd_scheduler_t* scheduler) +int antd_scheduler_busy(antd_scheduler_t *scheduler) { return scheduler->pending_task != 0; } -int antd_task_schedule(antd_scheduler_t* scheduler) +int antd_task_schedule(antd_scheduler_t *scheduler) { // fetch next task from the task_queue antd_task_item_t it = NULL; pthread_mutex_lock(&scheduler->scheduler_lock); - for(int i = 0; i< N_PRIORITY; i++) + for (int i = 0; i < N_PRIORITY; i++) { - + it = dequeue(&scheduler->task_queue[i]); - if(it) + if (it) break; } pthread_mutex_unlock(&scheduler->scheduler_lock); // no task - if(!it) + if (!it) { return 0; } + pthread_mutex_lock(&scheduler->pending_lock); + scheduler->pending_task--; + pthread_mutex_unlock(&scheduler->pending_lock); // has the task now // validate the task - if(scheduler->validate_data && difftime( time(NULL), it->task->access_time) > MAX_VALIDITY_INTERVAL && it->task->priority == N_PRIORITY - 1) + if (scheduler->validate_data && difftime(time(NULL), it->task->access_time) > MAX_VALIDITY_INTERVAL && it->task->priority == N_PRIORITY - 1) { // data task is not valid - // LOG("Task is no longer valid and will be killed"); - if(scheduler->destroy_data) + LOG("Task is no longer valid and will be killed"); + if (scheduler->destroy_data) scheduler->destroy_data(it->task->data); - if(it->task->callback) + if (it->task->callback) free_callback(it->task->callback); free(it->task); free(it); @@ -354,10 +482,10 @@ int antd_task_schedule(antd_scheduler_t* scheduler) } // check the type of task - if(it->task->type == LIGHT || scheduler->n_workers <= 0) + if (it->task->type == LIGHT || scheduler->n_workers <= 0) { // do it by myself - antd_execute_task( scheduler, it); + antd_execute_task(scheduler, it); } else { @@ -372,13 +500,13 @@ int antd_task_schedule(antd_scheduler_t* scheduler) } return 1; } -void antd_wait(antd_scheduler_t* scheduler) +void antd_wait(antd_scheduler_t *scheduler) { int stat; - while(scheduler->status) + while (scheduler->status) { stat = antd_task_schedule(scheduler); - if(!stat) + if (!stat) { // no task found, go to idle state sem_wait(scheduler->scheduler_sem); diff --git a/lib/scheduler.h b/lib/scheduler.h index 1b17b8d..3f53b31 100644 --- a/lib/scheduler.h +++ b/lib/scheduler.h @@ -9,6 +9,7 @@ #define LOW_PRIORITY (N_PRIORITY - 1) #define HIGH_PRIORITY 0 #define MAX_VALIDITY_INTERVAL 20 // 10 s for task validity +#define MAX_FIFO_NAME_SZ 255 typedef enum { LIGHT, @@ -92,6 +93,13 @@ typedef struct */ void* (*destroy_data)(void*); int validate_data; + /** + * statistic infomation + */ + char stat_fifo[MAX_FIFO_NAME_SZ]; + int stat_fd; + pthread_t stat_tid; + void (*stat_data_cb)(int, void *); } antd_scheduler_t; /* diff --git a/lib/ws.c b/lib/ws.c index 5c653f1..6a221c3 100644 --- a/lib/ws.c +++ b/lib/ws.c @@ -415,7 +415,7 @@ int ws_client_connect(ws_client_t* wsclient, port_config_t pcnf) } // will be free wsclient->antdsock->sock = sock; - wsclient->antdsock->status = 0; + wsclient->antdsock->z_status = 0; wsclient->antdsock->last_io = time(NULL); wsclient->antdsock->zstream = NULL; #ifdef USE_OPENSSL