diff --git a/dist/antd-publishers-0.1.0a.tar.gz b/dist/antd-publishers-0.1.0a.tar.gz index 0025247..ab2a18c 100644 Binary files a/dist/antd-publishers-0.1.0a.tar.gz and b/dist/antd-publishers-0.1.0a.tar.gz differ diff --git a/tunnel.c b/tunnel.c index d808254..5f597f2 100644 --- a/tunnel.c +++ b/tunnel.c @@ -11,11 +11,58 @@ #define MODULE_NAME "api" +static int guard_read(int fd, void* buffer, size_t size) +{ + int n = 0; + int read_len; + int st; + while(n != (int)size) + { + read_len = (int)size - n; + st = read(fd,buffer + n,read_len); + if(st == -1) + { + M_ERROR(MODULE_NAME, "Unable to read from #%d: %s", fd, strerror(errno)); + return -1; + } + if(st == 0) + { + M_ERROR(MODULE_NAME,"Endpoint %d is closed", fd); + return -1; + } + n += st; + } + return n; +} + +static int guard_write(int fd, void* buffer, size_t size) +{ + int n = 0; + int write_len; + int st; + while(n != (int)size) + { + write_len = (int)size - n; + st = write(fd,buffer + n,write_len); + if(st == -1) + { + M_ERROR(MODULE_NAME,"Unable to write to #%d: %s", fd, strerror(errno)); + return -1; + } + if(st == 0) + { + M_ERROR(MODULE_NAME,"Endpoint %d is closed", fd); + return -1; + } + n += st; + } + return n; +} static int msg_check_number(int fd, uint16_t number) { uint16_t value; - if(read(fd,&value,sizeof(value)) == -1) + if(guard_read(fd,&value,sizeof(value)) == -1) { M_ERROR(MODULE_NAME, "Unable to read integer value: %s", strerror(errno)); return -1; @@ -30,7 +77,7 @@ static int msg_check_number(int fd, uint16_t number) static int msg_read_string(int fd, char* buffer, uint8_t max_length) { uint8_t size; - if(read(fd,&size,sizeof(size)) == -1) + if(guard_read(fd,&size,sizeof(size)) == -1) { M_ERROR(MODULE_NAME, "Unable to read string size: %s", strerror(errno)); return -1; @@ -40,7 +87,7 @@ static int msg_read_string(int fd, char* buffer, uint8_t max_length) M_ERROR(MODULE_NAME, "String length exceeds the maximal value of %d", max_length); return -1; } - if(read(fd,buffer,size) == -1) + if(guard_read(fd,buffer,size) == -1) { M_ERROR(MODULE_NAME, "Unable to read string to buffer: %s", strerror(errno)); return -1; @@ -51,7 +98,7 @@ static int msg_read_string(int fd, char* buffer, uint8_t max_length) static uint8_t* msg_read_payload(int fd, uint32_t* size) { uint8_t* data; - if(read(fd,size,sizeof(*size)) == -1) + if(guard_read(fd,size,sizeof(*size)) == -1) { M_ERROR(MODULE_NAME, "Unable to read payload data size: %s", strerror(errno)); return NULL; @@ -67,7 +114,7 @@ static uint8_t* msg_read_payload(int fd, uint32_t* size) M_ERROR(MODULE_NAME, "Unable to allocate memory for payload data: %s", strerror(errno)); return NULL; } - if(read(fd,data,*size) == -1) + if(guard_read(fd,data,*size) == -1) { M_ERROR(MODULE_NAME, "Unable to read payload data to buffer: %s", strerror(errno)); free(data); @@ -106,7 +153,7 @@ int msg_read(int fd, tunnel_msg_t* msg) M_ERROR(MODULE_NAME, "Unable to check begin magic number"); return -1; } - if(read(fd,&msg->header.type,sizeof(msg->header.type)) == -1) + if(guard_read(fd,&msg->header.type,sizeof(msg->header.type)) == -1) { M_ERROR(MODULE_NAME, "Unable to read msg type: %s", strerror(errno)); return -1; @@ -116,12 +163,12 @@ int msg_read(int fd, tunnel_msg_t* msg) M_ERROR(MODULE_NAME, "Unknown msg type: %d", msg->header.type); return -1; } - if(read(fd, &msg->header.channel_id, sizeof(msg->header.channel_id)) == -1) + if(guard_read(fd, &msg->header.channel_id, sizeof(msg->header.channel_id)) == -1) { M_ERROR(MODULE_NAME, "Unable to read msg channel id"); return -1; } - if(read(fd, &msg->header.client_id, sizeof(msg->header.client_id)) == -1) + if(guard_read(fd, &msg->header.client_id, sizeof(msg->header.client_id)) == -1) { M_ERROR(MODULE_NAME, "Unable to read msg client id"); return -1; @@ -147,32 +194,32 @@ int msg_write(int fd, tunnel_msg_t* msg) { // write begin magic number uint16_t number = MSG_MAGIC_BEGIN; - if(write(fd,&number, sizeof(number)) == -1) + if(guard_write(fd,&number, sizeof(number)) == -1) { M_ERROR(MODULE_NAME, "Unable to write begin magic number: %s", strerror(errno)); return -1; } // write type - if(write(fd,&msg->header.type, sizeof(msg->header.type)) == -1) + if(guard_write(fd,&msg->header.type, sizeof(msg->header.type)) == -1) { M_ERROR(MODULE_NAME, "Unable to write msg type: %s", strerror(errno)); return -1; } // write channel id - if(write(fd,&msg->header.channel_id, sizeof(msg->header.channel_id)) == -1) + if(guard_write(fd,&msg->header.channel_id, sizeof(msg->header.channel_id)) == -1) { M_ERROR(MODULE_NAME, "Unable to write msg channel id: %s", strerror(errno)); return -1; } //write client id - if(write(fd,&msg->header.client_id, sizeof(msg->header.client_id)) == -1) + if(guard_write(fd,&msg->header.client_id, sizeof(msg->header.client_id)) == -1) { M_ERROR(MODULE_NAME, "Unable to write msg client id: %s", strerror(errno)); return -1; } // write payload len - if(write(fd,&msg->header.size, sizeof(msg->header.size)) == -1) + if(guard_write(fd,&msg->header.size, sizeof(msg->header.size)) == -1) { M_ERROR(MODULE_NAME, "Unable to write msg payload length: %s", strerror(errno)); return -1; @@ -180,14 +227,14 @@ int msg_write(int fd, tunnel_msg_t* msg) // write payload data if(msg->header.size > 0) { - if(write(fd,msg->data, msg->header.size) == -1) + if(guard_write(fd,msg->data, msg->header.size) == -1) { M_ERROR(MODULE_NAME, "Unable to write msg payload: %s", strerror(errno)); return -1; } } number = MSG_MAGIC_END; - if(write(fd,&number, sizeof(number)) == -1) + if(guard_write(fd,&number, sizeof(number)) == -1) { M_ERROR(MODULE_NAME, "Unable to write end magic number: %s", strerror(errno)); return -1; diff --git a/v4l2cam/v4l2cam.c b/v4l2cam/v4l2cam.c index 9c694e6..af33510 100644 --- a/v4l2cam/v4l2cam.c +++ b/v4l2cam/v4l2cam.c @@ -13,6 +13,7 @@ #include #include #include +#include #include "../tunnel.h" @@ -27,6 +28,7 @@ typedef struct uint8_t jpeg_quality; uint8_t *raw_buffer; int fd; + int timerfd; int raw_size; char dev_name[DEV_SIZE]; uint8_t queued; @@ -313,6 +315,43 @@ static int cam_cleanup(cam_setting_t *opts, int close_fd) { (void)close(opts->fd); } + if (close_fd && opts->timerfd > 0) + { + (void)close(opts->timerfd); + } + return 0; +} +static int cam_init_timer(cam_setting_t* opts) +{ + long period = 0; + if(opts->timerfd != -1) + { + (void) close(opts->timerfd); + } + opts->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC); + if (opts->timerfd == -1) + { + M_ERROR(MODULE_NAME, "Unable to create timerfd: %s", strerror(errno)); + return -1; + } + else + { + period = (long) (1e9 / opts->fps); + M_LOG(MODULE_NAME, "Frame period set to %lu", period); + struct itimerspec ch_period = + { + .it_interval = {.tv_sec = 0, .tv_nsec = period}, + .it_value = {.tv_sec = 0, .tv_nsec = period}, /* first wake-up = interval */ + }; + + if (timerfd_settime(opts->timerfd, 0 /* no flags */, &ch_period, NULL) == -1) + { + M_ERROR(MODULE_NAME, "Unable to set framerate period: %s", strerror(errno)); + (void)close(opts->timerfd); + opts->timerfd = -1; + return -1; + } + } return 0; } static int cam_apply_setting(cam_setting_t *opts) @@ -349,6 +388,8 @@ static int cam_apply_setting(cam_setting_t *opts) M_ERROR(MODULE_NAME, "Unable to query buffer"); return -1; } + (void) cam_init_timer(opts); + return 0; } @@ -379,6 +420,7 @@ int main(const int argc, const char **argv) tunnel_msg_t msg; int status; fd_set fd_in; + uint64_t expirations_count; void *fargv[2]; unsigned int offset = 0; if (argc != 4) @@ -404,7 +446,7 @@ int main(const int argc, const char **argv) { exit(1); } - sock = open_unix_socket((char*)argv[1]); + sock = open_unix_socket((char *)argv[1]); if (sock == -1) { M_ERROR(MODULE_NAME, "Unable to open the hotline: %s", argv[1]); @@ -471,6 +513,12 @@ int main(const int argc, const char **argv) video_setting.queued = 1; } } + if(clients == NULL && video_setting.timerfd != -1) + { + (void) close(video_setting.timerfd); + video_setting.timerfd = -1; + } + status = select(maxfd + 1, &fd_in, NULL, NULL, NULL); switch (status) { @@ -494,7 +542,23 @@ int main(const int argc, const char **argv) { case CHANNEL_SUBSCRIBE: M_LOG(MODULE_NAME, "Client %d subscribes to the chanel", msg.header.client_id); + if(clients == NULL) + { + (void) cam_init_timer(&video_setting); + } clients = bst_insert(clients, msg.header.client_id, NULL); + // send back the ctl message + msg.header.type = CHANNEL_CTRL; + msg.header.size = 6; + msg.data = (uint8_t *)buff; + (void)memcpy(buff, &video_setting.width, sizeof(video_setting.width)); + (void)memcpy(buff + sizeof(video_setting.width), &video_setting.height, sizeof(video_setting.height)); + buff[sizeof(video_setting.width) + sizeof(video_setting.height)] = video_setting.fps; + buff[sizeof(video_setting.width) + sizeof(video_setting.height) + 1] = video_setting.jpeg_quality; + if (msg_write(sock, &msg) == -1) + { + running = 0; + } break; case CHANNEL_UNSUBSCRIBE: @@ -515,10 +579,10 @@ int main(const int argc, const char **argv) offset++; (void)memcpy(&video_setting.jpeg_quality, msg.data + offset, 1); M_LOG(MODULE_NAME, "Client request width: %d, height: %d, FPS: %d, JPEG quality: %d", - video_setting.width, - video_setting.height, - video_setting.fps, - video_setting.jpeg_quality); + video_setting.width, + video_setting.height, + video_setting.fps, + video_setting.jpeg_quality); if (cam_apply_setting(&video_setting) == -1) { M_ERROR(MODULE_NAME, "Unable to apply video setting"); @@ -532,6 +596,20 @@ int main(const int argc, const char **argv) { running = 0; } + else + { + // send back the ctl message + msg.header.type = CHANNEL_CTRL; + msg.header.size = 6; + msg.data = (uint8_t *)buff; + (void)memcpy(buff, &video_setting.width, sizeof(video_setting.width)); + (void)memcpy(buff + sizeof(video_setting.width), &video_setting.height, sizeof(video_setting.height)); + buff[sizeof(video_setting.width) + sizeof(video_setting.height)] = video_setting.fps; + buff[sizeof(video_setting.width) + sizeof(video_setting.height) + 1] = video_setting.jpeg_quality; + fargv[0] = (void *)&msg; + fargv[1] = (void *)&sock; + bst_for_each(clients, send_data, fargv, 2); + } } } else @@ -553,6 +631,21 @@ int main(const int argc, const char **argv) { running = 0; } + else + { + // check timeout + if(video_setting.timerfd > 0) + { + if(read(video_setting.timerfd, &expirations_count, sizeof(expirations_count)) != (int)sizeof(expirations_count)) + { + M_ERROR(MODULE_NAME, "Unable to read timer: %s", strerror(errno)); + } + else if (expirations_count > 1u) + { + M_ERROR(MODULE_NAME, "LOOP OVERFLOW COUNT: %llu", expirations_count); + } + } + } } else {