mirror of
https://github.com/lxsang/antd-tunnel-publishers
synced 2024-12-26 18:08:21 +01:00
update publisers
This commit is contained in:
parent
5b6db3eb75
commit
1f2ef8db4d
BIN
dist/antd-publishers-0.1.0a.tar.gz
vendored
BIN
dist/antd-publishers-0.1.0a.tar.gz
vendored
Binary file not shown.
77
tunnel.c
77
tunnel.c
@ -11,11 +11,58 @@
|
|||||||
|
|
||||||
#define MODULE_NAME "api"
|
#define MODULE_NAME "api"
|
||||||
|
|
||||||
|
static int guard_read(int fd, void* buffer, size_t size)
|
||||||
|
{
|
||||||
|
int n = 0;
|
||||||
|
int read_len;
|
||||||
|
int st;
|
||||||
|
while(n != (int)size)
|
||||||
|
{
|
||||||
|
read_len = (int)size - n;
|
||||||
|
st = read(fd,buffer + n,read_len);
|
||||||
|
if(st == -1)
|
||||||
|
{
|
||||||
|
M_ERROR(MODULE_NAME, "Unable to read from #%d: %s", fd, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if(st == 0)
|
||||||
|
{
|
||||||
|
M_ERROR(MODULE_NAME,"Endpoint %d is closed", fd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
n += st;
|
||||||
|
}
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int guard_write(int fd, void* buffer, size_t size)
|
||||||
|
{
|
||||||
|
int n = 0;
|
||||||
|
int write_len;
|
||||||
|
int st;
|
||||||
|
while(n != (int)size)
|
||||||
|
{
|
||||||
|
write_len = (int)size - n;
|
||||||
|
st = write(fd,buffer + n,write_len);
|
||||||
|
if(st == -1)
|
||||||
|
{
|
||||||
|
M_ERROR(MODULE_NAME,"Unable to write to #%d: %s", fd, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if(st == 0)
|
||||||
|
{
|
||||||
|
M_ERROR(MODULE_NAME,"Endpoint %d is closed", fd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
n += st;
|
||||||
|
}
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
static int msg_check_number(int fd, uint16_t number)
|
static int msg_check_number(int fd, uint16_t number)
|
||||||
{
|
{
|
||||||
uint16_t value;
|
uint16_t value;
|
||||||
if(read(fd,&value,sizeof(value)) == -1)
|
if(guard_read(fd,&value,sizeof(value)) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to read integer value: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to read integer value: %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
@ -30,7 +77,7 @@ static int msg_check_number(int fd, uint16_t number)
|
|||||||
static int msg_read_string(int fd, char* buffer, uint8_t max_length)
|
static int msg_read_string(int fd, char* buffer, uint8_t max_length)
|
||||||
{
|
{
|
||||||
uint8_t size;
|
uint8_t size;
|
||||||
if(read(fd,&size,sizeof(size)) == -1)
|
if(guard_read(fd,&size,sizeof(size)) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to read string size: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to read string size: %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
@ -40,7 +87,7 @@ static int msg_read_string(int fd, char* buffer, uint8_t max_length)
|
|||||||
M_ERROR(MODULE_NAME, "String length exceeds the maximal value of %d", max_length);
|
M_ERROR(MODULE_NAME, "String length exceeds the maximal value of %d", max_length);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if(read(fd,buffer,size) == -1)
|
if(guard_read(fd,buffer,size) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to read string to buffer: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to read string to buffer: %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
@ -51,7 +98,7 @@ static int msg_read_string(int fd, char* buffer, uint8_t max_length)
|
|||||||
static uint8_t* msg_read_payload(int fd, uint32_t* size)
|
static uint8_t* msg_read_payload(int fd, uint32_t* size)
|
||||||
{
|
{
|
||||||
uint8_t* data;
|
uint8_t* data;
|
||||||
if(read(fd,size,sizeof(*size)) == -1)
|
if(guard_read(fd,size,sizeof(*size)) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to read payload data size: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to read payload data size: %s", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
@ -67,7 +114,7 @@ static uint8_t* msg_read_payload(int fd, uint32_t* size)
|
|||||||
M_ERROR(MODULE_NAME, "Unable to allocate memory for payload data: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to allocate memory for payload data: %s", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if(read(fd,data,*size) == -1)
|
if(guard_read(fd,data,*size) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to read payload data to buffer: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to read payload data to buffer: %s", strerror(errno));
|
||||||
free(data);
|
free(data);
|
||||||
@ -106,7 +153,7 @@ int msg_read(int fd, tunnel_msg_t* msg)
|
|||||||
M_ERROR(MODULE_NAME, "Unable to check begin magic number");
|
M_ERROR(MODULE_NAME, "Unable to check begin magic number");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if(read(fd,&msg->header.type,sizeof(msg->header.type)) == -1)
|
if(guard_read(fd,&msg->header.type,sizeof(msg->header.type)) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to read msg type: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to read msg type: %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
@ -116,12 +163,12 @@ int msg_read(int fd, tunnel_msg_t* msg)
|
|||||||
M_ERROR(MODULE_NAME, "Unknown msg type: %d", msg->header.type);
|
M_ERROR(MODULE_NAME, "Unknown msg type: %d", msg->header.type);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if(read(fd, &msg->header.channel_id, sizeof(msg->header.channel_id)) == -1)
|
if(guard_read(fd, &msg->header.channel_id, sizeof(msg->header.channel_id)) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to read msg channel id");
|
M_ERROR(MODULE_NAME, "Unable to read msg channel id");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if(read(fd, &msg->header.client_id, sizeof(msg->header.client_id)) == -1)
|
if(guard_read(fd, &msg->header.client_id, sizeof(msg->header.client_id)) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to read msg client id");
|
M_ERROR(MODULE_NAME, "Unable to read msg client id");
|
||||||
return -1;
|
return -1;
|
||||||
@ -147,32 +194,32 @@ int msg_write(int fd, tunnel_msg_t* msg)
|
|||||||
{
|
{
|
||||||
// write begin magic number
|
// write begin magic number
|
||||||
uint16_t number = MSG_MAGIC_BEGIN;
|
uint16_t number = MSG_MAGIC_BEGIN;
|
||||||
if(write(fd,&number, sizeof(number)) == -1)
|
if(guard_write(fd,&number, sizeof(number)) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to write begin magic number: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to write begin magic number: %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// write type
|
// write type
|
||||||
if(write(fd,&msg->header.type, sizeof(msg->header.type)) == -1)
|
if(guard_write(fd,&msg->header.type, sizeof(msg->header.type)) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to write msg type: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to write msg type: %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// write channel id
|
// write channel id
|
||||||
if(write(fd,&msg->header.channel_id, sizeof(msg->header.channel_id)) == -1)
|
if(guard_write(fd,&msg->header.channel_id, sizeof(msg->header.channel_id)) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to write msg channel id: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to write msg channel id: %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
//write client id
|
//write client id
|
||||||
if(write(fd,&msg->header.client_id, sizeof(msg->header.client_id)) == -1)
|
if(guard_write(fd,&msg->header.client_id, sizeof(msg->header.client_id)) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to write msg client id: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to write msg client id: %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// write payload len
|
// write payload len
|
||||||
|
|
||||||
if(write(fd,&msg->header.size, sizeof(msg->header.size)) == -1)
|
if(guard_write(fd,&msg->header.size, sizeof(msg->header.size)) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to write msg payload length: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to write msg payload length: %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
@ -180,14 +227,14 @@ int msg_write(int fd, tunnel_msg_t* msg)
|
|||||||
// write payload data
|
// write payload data
|
||||||
if(msg->header.size > 0)
|
if(msg->header.size > 0)
|
||||||
{
|
{
|
||||||
if(write(fd,msg->data, msg->header.size) == -1)
|
if(guard_write(fd,msg->data, msg->header.size) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to write msg payload: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to write msg payload: %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
number = MSG_MAGIC_END;
|
number = MSG_MAGIC_END;
|
||||||
if(write(fd,&number, sizeof(number)) == -1)
|
if(guard_write(fd,&number, sizeof(number)) == -1)
|
||||||
{
|
{
|
||||||
M_ERROR(MODULE_NAME, "Unable to write end magic number: %s", strerror(errno));
|
M_ERROR(MODULE_NAME, "Unable to write end magic number: %s", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <jpeglib.h>
|
#include <jpeglib.h>
|
||||||
#include <sys/ioctl.h>
|
#include <sys/ioctl.h>
|
||||||
|
#include <sys/timerfd.h>
|
||||||
|
|
||||||
#include "../tunnel.h"
|
#include "../tunnel.h"
|
||||||
|
|
||||||
@ -27,6 +28,7 @@ typedef struct
|
|||||||
uint8_t jpeg_quality;
|
uint8_t jpeg_quality;
|
||||||
uint8_t *raw_buffer;
|
uint8_t *raw_buffer;
|
||||||
int fd;
|
int fd;
|
||||||
|
int timerfd;
|
||||||
int raw_size;
|
int raw_size;
|
||||||
char dev_name[DEV_SIZE];
|
char dev_name[DEV_SIZE];
|
||||||
uint8_t queued;
|
uint8_t queued;
|
||||||
@ -313,6 +315,43 @@ static int cam_cleanup(cam_setting_t *opts, int close_fd)
|
|||||||
{
|
{
|
||||||
(void)close(opts->fd);
|
(void)close(opts->fd);
|
||||||
}
|
}
|
||||||
|
if (close_fd && opts->timerfd > 0)
|
||||||
|
{
|
||||||
|
(void)close(opts->timerfd);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static int cam_init_timer(cam_setting_t* opts)
|
||||||
|
{
|
||||||
|
long period = 0;
|
||||||
|
if(opts->timerfd != -1)
|
||||||
|
{
|
||||||
|
(void) close(opts->timerfd);
|
||||||
|
}
|
||||||
|
opts->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
|
||||||
|
if (opts->timerfd == -1)
|
||||||
|
{
|
||||||
|
M_ERROR(MODULE_NAME, "Unable to create timerfd: %s", strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
period = (long) (1e9 / opts->fps);
|
||||||
|
M_LOG(MODULE_NAME, "Frame period set to %lu", period);
|
||||||
|
struct itimerspec ch_period =
|
||||||
|
{
|
||||||
|
.it_interval = {.tv_sec = 0, .tv_nsec = period},
|
||||||
|
.it_value = {.tv_sec = 0, .tv_nsec = period}, /* first wake-up = interval */
|
||||||
|
};
|
||||||
|
|
||||||
|
if (timerfd_settime(opts->timerfd, 0 /* no flags */, &ch_period, NULL) == -1)
|
||||||
|
{
|
||||||
|
M_ERROR(MODULE_NAME, "Unable to set framerate period: %s", strerror(errno));
|
||||||
|
(void)close(opts->timerfd);
|
||||||
|
opts->timerfd = -1;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
static int cam_apply_setting(cam_setting_t *opts)
|
static int cam_apply_setting(cam_setting_t *opts)
|
||||||
@ -349,6 +388,8 @@ static int cam_apply_setting(cam_setting_t *opts)
|
|||||||
M_ERROR(MODULE_NAME, "Unable to query buffer");
|
M_ERROR(MODULE_NAME, "Unable to query buffer");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
(void) cam_init_timer(opts);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -379,6 +420,7 @@ int main(const int argc, const char **argv)
|
|||||||
tunnel_msg_t msg;
|
tunnel_msg_t msg;
|
||||||
int status;
|
int status;
|
||||||
fd_set fd_in;
|
fd_set fd_in;
|
||||||
|
uint64_t expirations_count;
|
||||||
void *fargv[2];
|
void *fargv[2];
|
||||||
unsigned int offset = 0;
|
unsigned int offset = 0;
|
||||||
if (argc != 4)
|
if (argc != 4)
|
||||||
@ -471,6 +513,12 @@ int main(const int argc, const char **argv)
|
|||||||
video_setting.queued = 1;
|
video_setting.queued = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if(clients == NULL && video_setting.timerfd != -1)
|
||||||
|
{
|
||||||
|
(void) close(video_setting.timerfd);
|
||||||
|
video_setting.timerfd = -1;
|
||||||
|
}
|
||||||
|
|
||||||
status = select(maxfd + 1, &fd_in, NULL, NULL, NULL);
|
status = select(maxfd + 1, &fd_in, NULL, NULL, NULL);
|
||||||
switch (status)
|
switch (status)
|
||||||
{
|
{
|
||||||
@ -494,7 +542,23 @@ int main(const int argc, const char **argv)
|
|||||||
{
|
{
|
||||||
case CHANNEL_SUBSCRIBE:
|
case CHANNEL_SUBSCRIBE:
|
||||||
M_LOG(MODULE_NAME, "Client %d subscribes to the chanel", msg.header.client_id);
|
M_LOG(MODULE_NAME, "Client %d subscribes to the chanel", msg.header.client_id);
|
||||||
|
if(clients == NULL)
|
||||||
|
{
|
||||||
|
(void) cam_init_timer(&video_setting);
|
||||||
|
}
|
||||||
clients = bst_insert(clients, msg.header.client_id, NULL);
|
clients = bst_insert(clients, msg.header.client_id, NULL);
|
||||||
|
// send back the ctl message
|
||||||
|
msg.header.type = CHANNEL_CTRL;
|
||||||
|
msg.header.size = 6;
|
||||||
|
msg.data = (uint8_t *)buff;
|
||||||
|
(void)memcpy(buff, &video_setting.width, sizeof(video_setting.width));
|
||||||
|
(void)memcpy(buff + sizeof(video_setting.width), &video_setting.height, sizeof(video_setting.height));
|
||||||
|
buff[sizeof(video_setting.width) + sizeof(video_setting.height)] = video_setting.fps;
|
||||||
|
buff[sizeof(video_setting.width) + sizeof(video_setting.height) + 1] = video_setting.jpeg_quality;
|
||||||
|
if (msg_write(sock, &msg) == -1)
|
||||||
|
{
|
||||||
|
running = 0;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case CHANNEL_UNSUBSCRIBE:
|
case CHANNEL_UNSUBSCRIBE:
|
||||||
@ -532,6 +596,20 @@ int main(const int argc, const char **argv)
|
|||||||
{
|
{
|
||||||
running = 0;
|
running = 0;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// send back the ctl message
|
||||||
|
msg.header.type = CHANNEL_CTRL;
|
||||||
|
msg.header.size = 6;
|
||||||
|
msg.data = (uint8_t *)buff;
|
||||||
|
(void)memcpy(buff, &video_setting.width, sizeof(video_setting.width));
|
||||||
|
(void)memcpy(buff + sizeof(video_setting.width), &video_setting.height, sizeof(video_setting.height));
|
||||||
|
buff[sizeof(video_setting.width) + sizeof(video_setting.height)] = video_setting.fps;
|
||||||
|
buff[sizeof(video_setting.width) + sizeof(video_setting.height) + 1] = video_setting.jpeg_quality;
|
||||||
|
fargv[0] = (void *)&msg;
|
||||||
|
fargv[1] = (void *)&sock;
|
||||||
|
bst_for_each(clients, send_data, fargv, 2);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -553,6 +631,21 @@ int main(const int argc, const char **argv)
|
|||||||
{
|
{
|
||||||
running = 0;
|
running = 0;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// check timeout
|
||||||
|
if(video_setting.timerfd > 0)
|
||||||
|
{
|
||||||
|
if(read(video_setting.timerfd, &expirations_count, sizeof(expirations_count)) != (int)sizeof(expirations_count))
|
||||||
|
{
|
||||||
|
M_ERROR(MODULE_NAME, "Unable to read timer: %s", strerror(errno));
|
||||||
|
}
|
||||||
|
else if (expirations_count > 1u)
|
||||||
|
{
|
||||||
|
M_ERROR(MODULE_NAME, "LOOP OVERFLOW COUNT: %llu", expirations_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user