Commit 63bf56d0 authored by Mikhail Karpenko's avatar Mikhail Karpenko

Process commands in a thread

parent 73ecbfc7
...@@ -231,6 +231,7 @@ void camogm_init(camogm_state *state, char *pipe_name) ...@@ -231,6 +231,7 @@ void camogm_init(camogm_state *state, char *pipe_name)
state->rawdev.curr_pos_w = state->rawdev.start_pos; state->rawdev.curr_pos_w = state->rawdev.start_pos;
state->rawdev.curr_pos_r = state->rawdev.start_pos; state->rawdev.curr_pos_r = state->rawdev.start_pos;
state->active_chn = ALL_CHN_ACTIVE; state->active_chn = ALL_CHN_ACTIVE;
state->rawdev.mmap_size = MMAP_CHUNK_SIZE;
} }
/** /**
...@@ -802,7 +803,7 @@ void camogm_set_prefix(camogm_state *state, const char * p, path_type type) ...@@ -802,7 +803,7 @@ void camogm_set_prefix(camogm_state *state, const char * p, path_type type)
D0(fprintf(debug_file, "WARNING: raw device write initiated\n")); D0(fprintf(debug_file, "WARNING: raw device write initiated\n"));
state->rawdev_op = 1; state->rawdev_op = 1;
/* debug code follows */ /* debug code follows */
// state->rawdev.end_pos = 10485760; // 10 Mib state->rawdev.end_pos = 10485760; // 10 Mib
/* end of debug code */ /* end of debug code */
} }
} }
...@@ -954,6 +955,8 @@ void camogm_status(camogm_state *state, char * fn, int xml) ...@@ -954,6 +955,8 @@ void camogm_status(camogm_state *state, char * fn, int xml)
case STATE_READING: case STATE_READING:
_state = "reading"; _state = "reading";
break; break;
default:
_state = "stopped";
} }
_output_format = state->format ? ((state->format == CAMOGM_FORMAT_OGM) ? "ogm" : _output_format = state->format ? ((state->format == CAMOGM_FORMAT_OGM) ? "ogm" :
((state->format == CAMOGM_FORMAT_JPEG) ? "jpeg" : ((state->format == CAMOGM_FORMAT_JPEG) ? "jpeg" :
...@@ -1006,7 +1009,7 @@ void camogm_status(camogm_state *state, char * fn, int xml) ...@@ -1006,7 +1009,7 @@ void camogm_status(camogm_state *state, char * fn, int xml)
" <raw_device_path>\"%s\"</raw_device_path>\n" \ " <raw_device_path>\"%s\"</raw_device_path>\n" \
" <raw_device_overruns>%d</raw_device_overruns>\n" \ " <raw_device_overruns>%d</raw_device_overruns>\n" \
" <raw_device_pos_write>0x%llx</raw_dev_pos_write>\n" \ " <raw_device_pos_write>0x%llx</raw_dev_pos_write>\n" \
" <raw_device_pos_read>0x%llx (%d\% done)</raw_device_pos_read>\n", " <raw_device_pos_read>0x%llx (%d%% done)</raw_device_pos_read>\n",
_state, state->path, state->frameno, state->start_after_timestamp, _dur, _udur, _len, \ _state, state->path, state->frameno, state->start_after_timestamp, _dur, _udur, _len, \
_frames_skip, _sec_skip, \ _frames_skip, _sec_skip, \
state->width, state->height, _output_format, _using_exif, \ state->width, state->height, _output_format, _using_exif, \
...@@ -1067,7 +1070,7 @@ void camogm_status(camogm_state *state, char * fn, int xml) ...@@ -1067,7 +1070,7 @@ void camogm_status(camogm_state *state, char * fn, int xml)
fprintf(f, "raw device overruns\t%d\n", state->rawdev.overrun); fprintf(f, "raw device overruns\t%d\n", state->rawdev.overrun);
fprintf(f, "raw write position \t0x%llx\n", state->rawdev.curr_pos_w); fprintf(f, "raw write position \t0x%llx\n", state->rawdev.curr_pos_w);
fprintf(f, "raw read position \t0x%llx\n", state->rawdev.curr_pos_r); fprintf(f, "raw read position \t0x%llx\n", state->rawdev.curr_pos_r);
fprintf(f, " percent done \t%d\%\n", _percent_done); fprintf(f, " percent done \t%d%%\n", _percent_done);
fprintf(f, "max file duration \t%d sec\n", state->segment_duration); fprintf(f, "max file duration \t%d sec\n", state->segment_duration);
fprintf(f, "max file length \t%d B\n", state->segment_length); fprintf(f, "max file length \t%d B\n", state->segment_length);
fprintf(f, "max frames \t%d\n", state->max_frames); fprintf(f, "max frames \t%d\n", state->max_frames);
...@@ -1498,7 +1501,7 @@ int listener_loop(camogm_state *state) ...@@ -1498,7 +1501,7 @@ int listener_loop(camogm_state *state)
state->rawdev.thread_finished = false; state->rawdev.thread_finished = false;
state->prog_state = STATE_STOPPED; state->prog_state = STATE_STOPPED;
pthread_join(state->rawdev.tid, &tret); pthread_join(state->rawdev.tid, &tret);
if ((int)tret != 0 && (int)tret != PTHREAD_CANCELED) { if ((int)tret != 0 && (int)tret != (int)PTHREAD_CANCELED) {
D0(fprintf(debug_file, "Reading thread returned error %d\n", (int)tret)); D0(fprintf(debug_file, "Reading thread returned error %d\n", (int)tret));
} else { } else {
D3(fprintf(debug_file, "Reading thread stopped\n")); D3(fprintf(debug_file, "Reading thread stopped\n"));
...@@ -1509,7 +1512,8 @@ int listener_loop(camogm_state *state) ...@@ -1509,7 +1512,8 @@ int listener_loop(camogm_state *state)
} else if (state->rawdev.thread_state == STATE_STOPPED) { } else if (state->rawdev.thread_state == STATE_STOPPED) {
state->rawdev.thread_state = STATE_RUNNING; state->rawdev.thread_state = STATE_RUNNING;
state->rawdev.thread_finished = false; state->rawdev.thread_finished = false;
if (pthread_create(&state->rawdev.tid, NULL, build_index, state) != 0) { // if (pthread_create(&state->rawdev.tid, NULL, build_index, state) != 0) {
if (pthread_create(&state->rawdev.tid, NULL, reader, state) != 0) {
state->prog_state = STATE_STOPPED; state->prog_state = STATE_STOPPED;
state->rawdev.thread_state = STATE_STOPPED; state->rawdev.thread_state = STATE_STOPPED;
D0(fprintf(debug_file, "%s:line %d: Can not start new thread, disk index is not built\n", __FILE__, __LINE__)); D0(fprintf(debug_file, "%s:line %d: Can not start new thread, disk index is not built\n", __FILE__, __LINE__));
......
...@@ -60,6 +60,7 @@ ...@@ -60,6 +60,7 @@
#define RAWDEV_START_OFFSET 1024 #define RAWDEV_START_OFFSET 1024
/** @brief Maximum length of file or raw device path */ /** @brief Maximum length of file or raw device path */
#define ELPHEL_PATH_MAX 300 #define ELPHEL_PATH_MAX 300
#define MMAP_CHUNK_SIZE 134217728
/** /**
* @enum state_flags * @enum state_flags
...@@ -99,6 +100,8 @@ typedef struct { ...@@ -99,6 +100,8 @@ typedef struct {
uint64_t start_pos; uint64_t start_pos;
uint64_t end_pos; uint64_t end_pos;
uint64_t curr_pos_w; uint64_t curr_pos_w;
uint64_t *disk_mmap;
uint64_t mmap_size;
volatile uint64_t curr_pos_r; volatile uint64_t curr_pos_r;
uint64_t file_start; uint64_t file_start;
pthread_t tid; pthread_t tid;
......
...@@ -33,6 +33,9 @@ ...@@ -33,6 +33,9 @@
#include <ctype.h> #include <ctype.h>
#include <asm/byteorder.h> #include <asm/byteorder.h>
#include <sys/statvfs.h> #include <sys/statvfs.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "camogm_read.h" #include "camogm_read.h"
...@@ -41,6 +44,9 @@ ...@@ -41,6 +44,9 @@
/** @brief Separator character between seconds and microseconds in JPEG file name */ /** @brief Separator character between seconds and microseconds in JPEG file name */
#define SUBSEC_SEPARATOR '.' #define SUBSEC_SEPARATOR '.'
#define EXIF_DATE_TIME_FORMAT "%Y:%m:%d %H:%M:%S" #define EXIF_DATE_TIME_FORMAT "%Y:%m:%d %H:%M:%S"
#define CMD_DELIMITER "/?"
#define CMD_BUFF_LEN 1024
#define COMMAND_LOOP_DELAY 500000
/** @brief The size of read buffer in bytes. The data will be read from disk in blocks of this size */ /** @brief The size of read buffer in bytes. The data will be read from disk in blocks of this size */
#define PHY_BLK_SZ 4096 #define PHY_BLK_SZ 4096
/** @brief Include or exclude file start and stop markers from resulting file. This must be set to 1 for JPEG files */ /** @brief Include or exclude file start and stop markers from resulting file. This must be set to 1 for JPEG files */
...@@ -60,6 +66,26 @@ static const struct iovec elphel_en = { ...@@ -60,6 +66,26 @@ static const struct iovec elphel_en = {
.iov_len = sizeof(elphelen) .iov_len = sizeof(elphelen)
}; };
#define COMMAND_TABLE \
X(CMD_BUILD_INDEX, "build_index") \
X(CMD_GET_INDEX, "get_index") \
X(CMD_READ_DISK, "read_disk") \
X(CMD_READ_FILE, "read_file") \
X(CMD_READ_ALL_FILES, "read_all_files") \
X(CMD_STATUS, "status")
#define X(a, b) a,
enum socket_commands {
COMMAND_TABLE
};
#undef X
#define X(a, b) b,
static const char *cmd_list[] = {
COMMAND_TABLE
};
#undef X
/** /**
* @enum file_result * @enum file_result
* @brief Return codes for file operations * @brief Return codes for file operations
...@@ -122,6 +148,16 @@ enum search_state { ...@@ -122,6 +148,16 @@ enum search_state {
SEARCH_FILE_START, SEARCH_FILE_START,
SEARCH_FILE_DATA SEARCH_FILE_DATA
}; };
//enum sock_commands {
// CMD_BUILD_INDEX,
// CMD_GET_INDEX,
// CMD_READ_DISK,
// CMD_READ_FILE,
// CMD_READ_ALL_FILES,
// CMD_STATUS
//};
/** /**
* @brief Exif data format table. * @brief Exif data format table.
* *
...@@ -219,6 +255,8 @@ struct exit_state { ...@@ -219,6 +255,8 @@ struct exit_state {
int ret_val; int ret_val;
}; };
static inline void exit_thread(void *arg);
void dump_index_dir(const struct disk_idir *idir) void dump_index_dir(const struct disk_idir *idir)
{ {
struct disk_index *ind = idir->head; struct disk_index *ind = idir->head;
...@@ -540,39 +578,39 @@ int stop_index(struct disk_idir *idir, uint64_t pos_stop) ...@@ -540,39 +578,39 @@ int stop_index(struct disk_idir *idir, uint64_t pos_stop)
* @param[in] f_op pointer to a structure holding information about currently opened file * @param[in] f_op pointer to a structure holding information about currently opened file
* @return \e FILE_OK if file was successfully opened and negative error code otherwise * @return \e FILE_OK if file was successfully opened and negative error code otherwise
*/ */
static int start_new_file(struct file_opts *f_op) //static int start_new_file(struct file_opts *f_op)
{ //{
int ret; // int ret;
int err; // int err;
struct statvfs vfs; // struct statvfs vfs;
uint64_t free_size = 0; // uint64_t free_size = 0;
char file_name[ELPHEL_PATH_MAX] = {0}; // char file_name[ELPHEL_PATH_MAX] = {0};
//
memset(&vfs, 0, sizeof(struct statvfs)); // memset(&vfs, 0, sizeof(struct statvfs));
ret = statvfs(f_op->state->path_prefix, &vfs); // ret = statvfs(f_op->state->path_prefix, &vfs);
if (ret != 0) { // if (ret != 0) {
D0(fprintf(debug_file, "Unable to get free size on disk, statvfs() returned %d\n", ret)); // D0(fprintf(debug_file, "Unable to get free size on disk, statvfs() returned %d\n", ret));
return -CAMOGM_FRAME_FILE_ERR; // return -CAMOGM_FRAME_FILE_ERR;
} // }
free_size = (uint64_t)vfs.f_bsize * (uint64_t)vfs.f_bfree; // free_size = (uint64_t)vfs.f_bsize * (uint64_t)vfs.f_bfree;
// statvfs can return irrelevant values in some fields for unsupported file systems, // // statvfs can return irrelevant values in some fields for unsupported file systems,
// thus free_size is checked to be equal to non-zero value // // thus free_size is checked to be equal to non-zero value
if (free_size > 0 && free_size < FREE_SIZE_LIMIT) { // if (free_size > 0 && free_size < FREE_SIZE_LIMIT) {
return -CAMOGM_NO_SPACE; // return -CAMOGM_NO_SPACE;
} // }
//
// make_fname(f_op->state, file_name); //// make_fname(f_op->state, file_name);
sprintf(f_op->state->path, "%s%s", f_op->state->path_prefix, file_name); // sprintf(f_op->state->path, "%s%s", f_op->state->path_prefix, file_name);
//
if ((f_op->fh = fopen(f_op->state->path, "w")) == NULL) { // if ((f_op->fh = fopen(f_op->state->path, "w")) == NULL) {
err = errno; // err = errno;
D0(fprintf(debug_file, "Error opening %s for writing\n", file_name)); // D0(fprintf(debug_file, "Error opening %s for writing\n", file_name));
D0(fprintf(debug_file, "%s\n", strerror(err))); // D0(fprintf(debug_file, "%s\n", strerror(err)));
return -CAMOGM_FRAME_FILE_ERR; // return -CAMOGM_FRAME_FILE_ERR;
} // }
//
return FILE_OK; // return FILE_OK;
} //}
/** /**
* @brief Detect cases when file marker crosses read buffer boundary * @brief Detect cases when file marker crosses read buffer boundary
...@@ -636,56 +674,221 @@ static int check_edge_case(const struct iovec *from, const struct iovec *to, con ...@@ -636,56 +674,221 @@ static int check_edge_case(const struct iovec *from, const struct iovec *to, con
* @param[in] to end pointer to data buffer * @param[in] to end pointer to data buffer
* @return a constant of #file_result type * @return a constant of #file_result type
*/ */
static int write_buffer(struct file_opts *f_op, unsigned char *from, unsigned char *to) //static int write_buffer(struct file_opts *f_op, unsigned char *from, unsigned char *to)
//{
// int ret = FILE_OK;
// int len;
// unsigned int sz;
//
// sz = to - from;
// switch (f_op->file_state) {
// case WRITE_RUNNING:
// len = fwrite(from, sz, 1, f_op->fh);
// if (len != 1) {
// perror(__func__);
// ret = FILE_WR_ERR;
// }
// break;
// case WRITE_START:
// if ((ret = start_new_file(f_op)) == FILE_OK) {
// len = fwrite(from, sz, 1, f_op->fh);
// if (len != 1) {
// perror(__func__);
// ret = FILE_WR_ERR;
// }
// } else {
// if (ret == -CAMOGM_NO_SPACE)
// D0(fprintf(debug_file, "No free space left on the disk\n"));
// f_op->fh = NULL;
// ret = FILE_OPEN_ERR;
// }
// break;
// case WRITE_STOP:
// len = fwrite(from, sz, 1, f_op->fh);
// if (len != 1) {
// perror(__func__);
// ret = FILE_WR_ERR;
// }
// if (fclose(f_op->fh) != 0) {
// perror(__func__);
// ret = FILE_CLOSE_ERR;
// } else {
// f_op->fh = NULL;
// f_op->file_cntr++;
// }
// break;
// default:
// ret = FILE_OPT_ERR;
// }
// return ret;
//}
void send_buffer(int sockfd, unsigned char *buff, size_t sz)
{ {
int ret = FILE_OK; size_t bytes_left = sz;
int len; size_t bytes_written = 0;
unsigned int sz; size_t offset = 0;
sz = to - from; while (bytes_left > 0) {
switch (f_op->file_state) { bytes_written = write(sockfd, &buff[offset], bytes_left);
case WRITE_RUNNING: bytes_left -= bytes_written;
len = fwrite(from, sz, 1, f_op->fh); offset += bytes_written;
if (len != 1) {
perror(__func__);
ret = FILE_WR_ERR;
} }
break; }
case WRITE_START:
if ((ret = start_new_file(f_op)) == FILE_OK) { int mmap_disk(rawdev_buffer *rawdev)
len = fwrite(from, sz, 1, f_op->fh); {
if (len != 1) { int ret = 0;
perror(__func__);
ret = FILE_WR_ERR; rawdev->rawdev_fd = open(rawdev->rawdev_path, O_RDONLY);
if (rawdev->rawdev_fd < 0) {
return -1;
} }
} else { rawdev->disk_mmap = mmap(0, rawdev->mmap_size, PROT_READ, MAP_SHARED, rawdev->rawdev_fd, rawdev->start_pos);
if (ret == -CAMOGM_NO_SPACE) if (rawdev->disk_mmap == MAP_FAILED) {
D0(fprintf(debug_file, "No free space left on the disk\n")); close(rawdev->rawdev_fd);
f_op->fh = NULL; return -1;
ret = FILE_OPEN_ERR;
} }
return ret;
}
int unmmap_disk(rawdev_buffer *rawdev)
{
int ret = 0;
if (munmap(rawdev->disk_mmap, rawdev->mmap_size) != 0)
return -1;
if (close(rawdev->rawdev_fd) != 0)
return -1;
return ret;
}
#define PORT_NUMBER 3456
void prep_socket(int *socket_fd)
{
int opt = 1;
struct sockaddr_in sock;
memset((char *)&sock, 0, sizeof(struct sockaddr_in));
sock.sin_family = AF_INET;
sock.sin_port = htons(PORT_NUMBER);
*socket_fd = socket(AF_INET, SOCK_STREAM, 0);
setsockopt(*socket_fd, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt));
bind(*socket_fd, (struct sockaddr *) &sock, sizeof(struct sockaddr_in));
listen(*socket_fd, 10);
}
/**
*
* @param cmd
* cmd pointer is updated to point to current command
* @return -1 command not recognized, -2 end of buffer
*/
int parse_command(char **cmd)
{
size_t cmd_len;
int cmd_indx = -1;
char *char_ptr;
D6(fprintf(debug_file, "Parsing command line: %s\n", *cmd));
char_ptr = strpbrk(*cmd, CMD_DELIMITER);
if (char_ptr != NULL) {
char_ptr[0] = '\0';
char_ptr++;
for (int i = 0; i < sizeof(cmd_list) / sizeof(cmd_list[0]); i++) {
cmd_len = strlen(cmd_list[i]);
if (strncmp(char_ptr, cmd_list[i], cmd_len) == 0) {
cmd_indx = i;
break; break;
case WRITE_STOP: }
len = fwrite(from, sz, 1, f_op->fh); }
if (len != 1) { *cmd = char_ptr;
perror(__func__);
ret = FILE_WR_ERR;
}
if (fclose(f_op->fh) != 0) {
perror(__func__);
ret = FILE_CLOSE_ERR;
} else { } else {
f_op->fh = NULL; cmd_indx = -2;
f_op->file_cntr++;
} }
return cmd_indx;
}
/**
* @brief Break HTTP GET string after the command part as we do not need that part. The function
* finds the first space character after the command part starts and replaces it with null.
* @param[in,out] cmd pointer to HTTP GET string
*/
void trim_command(char *cmd, ssize_t cmd_len)
{
char *ptr_start, *ptr_end;
if (cmd_len >= 0 && cmd_len < CMD_BUFF_LEN)
cmd[cmd_len] = '\0';
ptr_start = strpbrk(cmd, CMD_DELIMITER);
if (ptr_start) {
ptr_end = strchr(ptr_start, ' ');
if (ptr_end)
ptr_end[0] = '\0';
}
}
/**
*
* @param arg
* @todo print unrecognized command
*/
void *reader(void *arg)
{
int sockfd, fd;
int cmd;
char cmd_buff[CMD_BUFF_LEN] = {0};
char *cmd_ptr;
ssize_t cmd_len;
camogm_state *state = (camogm_state *)arg;
rawdev_buffer *rawdev = &state->rawdev;
unsigned char *disk_buff = NULL;
struct exit_state exit_state = {
.state = state,
.ret_val = 0
};
prep_socket(&sockfd);
while (true) {
fd = accept(sockfd, NULL, 0);
if (fd == -1)
continue;
cmd_len = read(fd, cmd_buff, sizeof(cmd_buff) - 1);
cmd_ptr = cmd_buff;
trim_command(cmd_ptr, cmd_len);
while ((cmd = parse_command(&cmd_ptr)) != -2) {
if (cmd >= 0)
D6(fprintf(debug_file, "Got command '%s'\n", cmd_list[cmd]));
switch (cmd) {
case CMD_BUILD_INDEX:
break;
case CMD_GET_INDEX:
break;
case CMD_READ_DISK:
break;
case CMD_READ_FILE:
break;
case CMD_READ_ALL_FILES:
break;
case CMD_STATUS:
break; break;
default: default:
ret = FILE_OPT_ERR; D0(fprintf(debug_file, "Unrecognized command is skipped\n"));
} }
return ret; }
fprintf(debug_file, "Closing connection\n");
close(fd);
usleep(COMMAND_LOOP_DELAY);
}
return (void *) 0;
} }
static inline void *exit_thread(void *arg) static inline void exit_thread(void *arg)
{ {
struct exit_state *s = (struct exit_state *)arg; struct exit_state *s = (struct exit_state *)arg;
...@@ -694,7 +897,6 @@ static inline void *exit_thread(void *arg) ...@@ -694,7 +897,6 @@ static inline void *exit_thread(void *arg)
s->state->rawdev.rawdev_fd = -1; s->state->rawdev.rawdev_fd = -1;
} }
s->state->rawdev.thread_finished = true; s->state->rawdev.thread_finished = true;
return (void *) s->ret_val;
} }
/** /**
......
...@@ -23,7 +23,7 @@ struct disk_index { ...@@ -23,7 +23,7 @@ struct disk_index {
struct disk_index *next; struct disk_index *next;
struct disk_index *prev; struct disk_index *prev;
time_t rawtime; time_t rawtime;
useconds_t usec; unsigned int usec;
uint32_t port; uint32_t port;
size_t f_size; size_t f_size;
uint64_t f_offset; uint64_t f_offset;
...@@ -36,5 +36,6 @@ struct disk_idir { ...@@ -36,5 +36,6 @@ struct disk_idir {
}; };
void *build_index(void *arg); void *build_index(void *arg);
void *reader(void *arg);
#endif /* _CAMOGM_READ_H */ #endif /* _CAMOGM_READ_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment