From 25813b2215658aec65397ebd0927c6c074f63e6e Mon Sep 17 00:00:00 2001 From: Oleg Dzhimiev Date: Wed, 16 Jan 2019 16:52:18 -0700 Subject: [PATCH] 1. Built in glibc 2.28 and gcc 8.2.0: mkfifo/fopen/fread/fwrite kind of worked with glibc 2.26 and gcc 7.2.0. With 2.28 named-pipe reading with fread would read the pipe only once for some reason - probably because it stopped being a 'blocking' call To fix - switched to open/read/poll combination - so none of glibc functions is used. 2. camogm_fifo_reader - test program - use with *_writer or just 'echo' 3. camogm_fifo_writer - test program --- src/Makefile | 18 +++- src/camogm.c | 222 +++++++++++++++++++++++---------------- src/camogm_fifo_reader.c | 105 ++++++++++++++++++ src/camogm_fifo_writer.c | 36 +++++++ src/camogm_read.c | 4 +- 5 files changed, 288 insertions(+), 97 deletions(-) create mode 100644 src/camogm_fifo_reader.c create mode 100644 src/camogm_fifo_writer.c diff --git a/src/Makefile b/src/Makefile index 650a838..d038aa9 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,6 +1,9 @@ GUIDIR = camogmgui PROGS = camogm -TEST_PROG = camogm_test +TEST_PROG = camogm_test +TEST_PROG1 = camogm_fifo_writer +TEST_PROG2 = camogm_fifo_reader + PHPSCRIPTS = camogmstate.php $(GUIDIR)/camogmgui.php $(GUIDIR)/camogmgui.css $(GUIDIR)/camogmgui.js $(GUIDIR)/camogm_interface.php \ $(GUIDIR)/SpryTabbedPanels.css $(GUIDIR)/SpryTabbedPanels.js $(GUIDIR)/xml_simple.php $(GUIDIR)/SpryCollapsiblePanel.css \ $(GUIDIR)/SpryCollapsiblePanel.js @@ -12,7 +15,10 @@ IMAGES = $(GUIDIR)/images/filebrowser-01.gif $(GUIDIR)/images/filebrowser-bo SRCS = camogm.c camogm_ogm.c camogm_jpeg.c camogm_mov.c camogm_kml.c camogm_read.c index_list.c camogm_align.c -TEST_SRC = camogm_test.c +TEST_SRC = camogm_test.c +TEST_SRC1 = camogm_fifo_writer.c +TEST_SRC2 = camogm_fifo_reader.c + OBJS = $(SRCS:.c=.o) CFLAGS += -Wall -I$(STAGING_DIR_HOST)/usr/include-uapi @@ -28,17 +34,21 @@ BINDIR = /usr/bin/ WWW_PAGES = /www/pages IMAGEDIR = $(WWW_PAGES)/images -all: $(PROGS) $(TEST_PROG) +all: $(PROGS) $(TEST_PROG) $(TEST_PROG1) $(TEST_PROG2) $(PROGS): $(OBJS) $(CC) $(CFLAGS) $(LDFLAGS) $^ $(LDLIBS) -o $@ -$(TEST_PROG): $(TEST_SRC:.c=.o) +$(TEST_PROG): $(TEST_SRC:.c=.o) +$(TEST_PROG1): $(TEST_SRC1:.c=.o) +$(TEST_PROG2): $(TEST_SRC2:.c=.o) install: $(PROGS) $(PHPSCRIPTS) $(CONFIGS) $(INSTALL) $(OWN) -d $(DESTDIR)$(BINDIR) $(INSTALL) $(OWN) -m $(INSTMODE) $(PROGS) $(DESTDIR)$(BINDIR) $(INSTALL) $(OWN) -m $(INSTMODE) $(TEST_PROG) $(DESTDIR)$(BINDIR) + $(INSTALL) $(OWN) -m $(INSTMODE) $(TEST_PROG1) $(DESTDIR)$(BINDIR) + $(INSTALL) $(OWN) -m $(INSTMODE) $(TEST_PROG2) $(DESTDIR)$(BINDIR) $(INSTALL) $(OWN) -d $(DESTDIR)$(SYSCONFDIR) $(INSTALL) $(OWN) -m $(INSTDOCS) $(CONFIGS) $(DESTDIR)$(SYSCONFDIR) $(INSTALL) $(OWN) -d $(DESTDIR)$(WWW_PAGES) diff --git a/src/camogm.c b/src/camogm.c index 0ed222b..9f67fba 100644 --- a/src/camogm.c +++ b/src/camogm.c @@ -30,6 +30,8 @@ #include #include +#include + #include "camogm_ogm.h" #include "camogm_jpeg.h" #include "camogm_mov.h" @@ -38,6 +40,8 @@ /** @brief Default debug level */ #define DEFAULT_DEBUG_LVL 6 +/** @brief Default poll() timeout in ms*/ +#define DEFAULT_POLL_TIMEOUT 1000 /** @brief JPEG trailer size in bytes */ #define TRAILER_SIZE 0x02 /** @brief Default segment duration in seconds */ @@ -1301,7 +1305,9 @@ char * getLineFromPipe(FILE* npipe) if (!cmdbufp) cmdbuf[cmdbufp] = 0; //null-terminate first access (probably not needed for the static buffer nlp = strpbrk(cmdbuf, ";\n"); if (!nlp) { //no complete string, try to read more - fl = fread(&cmdbuf[cmdbufp], 1, sizeof(cmdbuf) - cmdbufp - 1, npipe); + // 2019/01/16: this change is related to switching to poll() + //fl = fread(&cmdbuf[cmdbufp], 1, sizeof(cmdbuf) - cmdbufp - 1, npipe); + fl = read(npipe, &cmdbuf[cmdbufp], sizeof(cmdbuf) - cmdbufp - 1); cmdbuf[cmdbufp + fl] = 0; // is there any complete string in a buffer after reading? nlp = strpbrk(&cmdbuf[cmdbufp], ";\n"); // there were no new lines before cmdbufp @@ -1548,6 +1554,8 @@ int listener_loop(camogm_state *state) int curr_port = 0; const char *pipe_name = state->pipe_name; + struct pollfd pfd; + // create a named pipe // always delete the pipe if it existed, start a fresh one f_ok = access(pipe_name, F_OK); @@ -1567,6 +1575,8 @@ int listener_loop(camogm_state *state) } } + /* old */ + /* // now open the pipe - will block until something will be written (or just open for writing, // reads themselves will not block) if (!((cmd_file = fopen(pipe_name, "r")))) { @@ -1574,6 +1584,27 @@ int listener_loop(camogm_state *state) clean_up(state); return -5; } + */ + + /* new: 2019/01/16 */ + + // Adding poll() because supposedly read calls (read, fread) + // stopped being blocking at all + + // Why O_RDWR? + // https://stackoverflow.com/questions/22021253/poll-on-named-pipe-returns-with-pollhup-constantly-and-immediately + + if (!((cmd_file = open(pipe_name, O_RDWR|O_NONBLOCK)))) { + D0(fprintf(debug_file, "Can not open command file %s\n", pipe_name)); + clean_up(state); + return -5; + } + /* force(?) disable O_NONBLOCK */ + fcntl(cmd_file, F_SETFL, 0); + // ready to read + pfd.events = POLLIN; + pfd.fd = cmd_file; + D0(fprintf(debug_file, "Pipe %s open for reading\n", pipe_name)); // to make sure something is sent out // enter main processing loop @@ -1581,99 +1612,108 @@ int listener_loop(camogm_state *state) curr_port = select_port(state); state->port_num = curr_port; // look at command queue first - cmd = parse_cmd(state, cmd_file); - if (cmd) { - if (cmd < 0) D0(fprintf(debug_file, "Unrecognized command\n")); - } else if (state->prog_state == STATE_RUNNING) { // no commands in queue, started - switch ((rslt = -sendImageFrame(state))) { - case 0: - break; // frame sent OK, nothing to do (TODO: check file length/duration) - case CAMOGM_FRAME_NOT_READY: // just wait for the frame to appear at the current pointer - // we'll wait for a frame, not to waste resources. But if the compressor is stopped this program will not respond to any commands - // TODO - add another wait with (short) timeout? - fp0 = lseek(state->fd_circ[curr_port], 0, SEEK_CUR); - if (fp0 < 0) { - D0(fprintf(debug_file, "%s:line %d got broken frame (%d) before waiting for ready\n", __FILE__, __LINE__, fp0)); - rslt = CAMOGM_FRAME_BROKEN; - } else { - fp1 = lseek(state->fd_circ[curr_port], LSEEK_CIRC_WAIT, SEEK_END); - if (fp1 < 0) { - D0(fprintf(debug_file, "%s:line %d got broken frame (%d) while waiting for ready. Before that fp0=0x%x\n", __FILE__, __LINE__, fp1, fp0)); + + ret = poll(&pfd,1,DEFAULT_POLL_TIMEOUT); + + if (ret==0){ + D6(fprintf(debug_file, "Waiting for commands...\n")); + } + + if (pfd.revents & POLLIN){ + cmd = parse_cmd(state, cmd_file); + if (cmd) { + if (cmd < 0) D0(fprintf(debug_file, "Unrecognized command\n")); + } else if (state->prog_state == STATE_RUNNING) { // no commands in queue, started + switch ((rslt = -sendImageFrame(state))) { + case 0: + break; // frame sent OK, nothing to do (TODO: check file length/duration) + case CAMOGM_FRAME_NOT_READY: // just wait for the frame to appear at the current pointer + // we'll wait for a frame, not to waste resources. But if the compressor is stopped this program will not respond to any commands + // TODO - add another wait with (short) timeout? + fp0 = lseek(state->fd_circ[curr_port], 0, SEEK_CUR); + if (fp0 < 0) { + D0(fprintf(debug_file, "%s:line %d got broken frame (%d) before waiting for ready\n", __FILE__, __LINE__, fp0)); rslt = CAMOGM_FRAME_BROKEN; } else { - break; + fp1 = lseek(state->fd_circ[curr_port], LSEEK_CIRC_WAIT, SEEK_END); + if (fp1 < 0) { + D0(fprintf(debug_file, "%s:line %d got broken frame (%d) while waiting for ready. Before that fp0=0x%x\n", __FILE__, __LINE__, fp1, fp0)); + rslt = CAMOGM_FRAME_BROKEN; + } else { + break; + } } - } - // no break - case CAMOGM_FRAME_CHANGED: // frame parameters have changed - case CAMOGM_FRAME_NEXTFILE: // next file needed (need to switch to a new file (time/size exceeded limit) - case CAMOGM_FRAME_INVALID: // invalid frame pointer - case CAMOGM_FRAME_BROKEN: // frame broken (buffer overrun) - // restart the file - D3(fprintf(debug_file,"%s:line %d - sendImageFrame() returned -%d\n", __FILE__, __LINE__, rslt)); - camogm_stop(state); - state->prog_state = STATE_RESTARTING; - camogm_start(state); - break; - case CAMOGM_FRAME_FILE_ERR: // error with file I/O - case CAMOGM_FRAME_OTHER: // other errors - D0(fprintf(debug_file, "%s:line %d - error=%d\n", __FILE__, __LINE__, rslt)); - break; - default: - D0(fprintf(debug_file, "%s:line %d - should not get here (rslt=%d)\n", __FILE__, __LINE__, rslt)); - clean_up(state); - exit(-1); - } // switch sendImageFrame() - - // collect error statistics - if (rslt > 0 && rslt < CAMOGM_ERRNUM) - state->error_stat[curr_port][rslt]++; - - if ((rslt != 0) && (rslt != CAMOGM_FRAME_NOT_READY) && (rslt != CAMOGM_FRAME_CHANGED)) - // add port number to error code to facilitate debugging - state->last_error_code = rslt + 100 * state->port_num; - } else if (state->prog_state == STATE_STARTING) { // no commands in queue,starting (but not started yet) - - // retry starting - switch ((rslt = -camogm_start(state))) { - case 0: - break; // file started OK, nothing to do - case CAMOGM_TOO_EARLY: - lseek(state->fd_circ[curr_port], LSEEK_CIRC_TOWP, SEEK_END); // set pointer to the frame to wait for - lseek(state->fd_circ[curr_port], LSEEK_CIRC_WAIT, SEEK_END); // It already passed CAMOGM_FRAME_NOT_READY, so compressor may be running already - break; // no need to wait extra - case CAMOGM_FRAME_NOT_READY: // just wait for the frame to appear at the current pointer - // we'll wait for a frame, not to waste resources. But if the compressor is stopped this program will not respond to any commands - // TODO - add another wait with (short) timeout? - case CAMOGM_FRAME_CHANGED: // frame parameters have changed - case CAMOGM_FRAME_NEXTFILE: - case CAMOGM_FRAME_INVALID: // invalid frame pointer - case CAMOGM_FRAME_BROKEN: // frame broken (buffer overrun) - usleep(COMMAND_LOOP_DELAY); // it should be not too long so empty buffer will not be overrun - break; - case CAMOGM_FRAME_FILE_ERR: // error with file I/O - case CAMOGM_FRAME_OTHER: // other errors - D0(fprintf(debug_file, "%s:line %d - error=%d\n", __FILE__, __LINE__, rslt)); - break; - default: - D0(fprintf(debug_file, "%s:line %d - should not get here (rslt=%d)\n", __FILE__, __LINE__, rslt)); - clean_up(state); - exit(-1); - } // switch camogm_start() - - // collect error statistics - if (rslt > 0 && rslt < CAMOGM_ERRNUM) - state->error_stat[curr_port][rslt]++; - - if ((rslt != 0) && (rslt != CAMOGM_TOO_EARLY) && (rslt != CAMOGM_FRAME_NOT_READY) && (rslt != CAMOGM_FRAME_CHANGED) ) - // add port number to error code to facilitate debugging - state->last_error_code = rslt + 100 * state->port_num; - } else if (state->prog_state == STATE_READING) { - usleep(COMMAND_LOOP_DELAY); - } else { // not running, not starting - state->rawdev.thread_state = STATE_RUNNING; - usleep(COMMAND_LOOP_DELAY); // make it longer but interruptible by signals? - } + // no break + case CAMOGM_FRAME_CHANGED: // frame parameters have changed + case CAMOGM_FRAME_NEXTFILE: // next file needed (need to switch to a new file (time/size exceeded limit) + case CAMOGM_FRAME_INVALID: // invalid frame pointer + case CAMOGM_FRAME_BROKEN: // frame broken (buffer overrun) + // restart the file + D3(fprintf(debug_file,"%s:line %d - sendImageFrame() returned -%d\n", __FILE__, __LINE__, rslt)); + camogm_stop(state); + state->prog_state = STATE_RESTARTING; + camogm_start(state); + break; + case CAMOGM_FRAME_FILE_ERR: // error with file I/O + case CAMOGM_FRAME_OTHER: // other errors + D0(fprintf(debug_file, "%s:line %d - error=%d\n", __FILE__, __LINE__, rslt)); + break; + default: + D0(fprintf(debug_file, "%s:line %d - should not get here (rslt=%d)\n", __FILE__, __LINE__, rslt)); + clean_up(state); + exit(-1); + } // switch sendImageFrame() + + // collect error statistics + if (rslt > 0 && rslt < CAMOGM_ERRNUM) + state->error_stat[curr_port][rslt]++; + + if ((rslt != 0) && (rslt != CAMOGM_FRAME_NOT_READY) && (rslt != CAMOGM_FRAME_CHANGED)) + // add port number to error code to facilitate debugging + state->last_error_code = rslt + 100 * state->port_num; + } else if (state->prog_state == STATE_STARTING) { // no commands in queue,starting (but not started yet) + + // retry starting + switch ((rslt = -camogm_start(state))) { + case 0: + break; // file started OK, nothing to do + case CAMOGM_TOO_EARLY: + lseek(state->fd_circ[curr_port], LSEEK_CIRC_TOWP, SEEK_END); // set pointer to the frame to wait for + lseek(state->fd_circ[curr_port], LSEEK_CIRC_WAIT, SEEK_END); // It already passed CAMOGM_FRAME_NOT_READY, so compressor may be running already + break; // no need to wait extra + case CAMOGM_FRAME_NOT_READY: // just wait for the frame to appear at the current pointer + // we'll wait for a frame, not to waste resources. But if the compressor is stopped this program will not respond to any commands + // TODO - add another wait with (short) timeout? + case CAMOGM_FRAME_CHANGED: // frame parameters have changed + case CAMOGM_FRAME_NEXTFILE: + case CAMOGM_FRAME_INVALID: // invalid frame pointer + case CAMOGM_FRAME_BROKEN: // frame broken (buffer overrun) + usleep(COMMAND_LOOP_DELAY); // it should be not too long so empty buffer will not be overrun + break; + case CAMOGM_FRAME_FILE_ERR: // error with file I/O + case CAMOGM_FRAME_OTHER: // other errors + D0(fprintf(debug_file, "%s:line %d - error=%d\n", __FILE__, __LINE__, rslt)); + break; + default: + D0(fprintf(debug_file, "%s:line %d - should not get here (rslt=%d)\n", __FILE__, __LINE__, rslt)); + clean_up(state); + exit(-1); + } // switch camogm_start() + + // collect error statistics + if (rslt > 0 && rslt < CAMOGM_ERRNUM) + state->error_stat[curr_port][rslt]++; + + if ((rslt != 0) && (rslt != CAMOGM_TOO_EARLY) && (rslt != CAMOGM_FRAME_NOT_READY) && (rslt != CAMOGM_FRAME_CHANGED) ) + // add port number to error code to facilitate debugging + state->last_error_code = rslt + 100 * state->port_num; + } else if (state->prog_state == STATE_READING) { + usleep(COMMAND_LOOP_DELAY); + } else { // not running, not starting + state->rawdev.thread_state = STATE_RUNNING; + usleep(COMMAND_LOOP_DELAY); // make it longer but interruptible by signals? + } + } // if pfd.revents & POLLIN } // while (process) // normally, we should not be here diff --git a/src/camogm_fifo_reader.c b/src/camogm_fifo_reader.c new file mode 100644 index 0000000..83d11b1 --- /dev/null +++ b/src/camogm_fifo_reader.c @@ -0,0 +1,105 @@ +/** @brief This define is needed to use lseek64 and should be set before includes */ +#define _LARGEFILE64_SOURCE +/** Needed for O_DIRECT */ +#define _GNU_SOURCE + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +// set to 1 second +#define CAMOGM_TIMEOUT 1000 + +int main(int argc, char *argv[]){ + + int f_ok; + int ret; + int fer, feo; + long int ftl; + int i=0; + const char pipe_name[] = "/tmp/fifo_test"; + int flags; + + char cmdbuf[1024]; + int fl; + + int somecounter = 0; + int writecounter = 0; + + FILE *cmd_file; + + struct pollfd pfd; + pfd.events = POLLIN; + + printf("This is reader. It creates FIFO: %s\n",pipe_name); + + // always delete pipe if it exists + f_ok = access(pipe_name, F_OK); + ret = unlink(pipe_name); + + if (ret && f_ok == 0) { + printf("Some error\n"); + } + + ret = mkfifo(pipe_name, 0666); //EEXIST + if (ret) { + if (errno==EEXIST){ + printf("Pipe exists\n"); + } + } + + cmd_file = open(pipe_name, O_RDWR|O_NONBLOCK); + pfd.fd = cmd_file; + fcntl(cmd_file, F_SETFL, 0); /* disable O_NONBLOCK */ + + printf("Pipe is now open for reading\n"); + + while(true){ + + ret = poll(&pfd, 1, CAMOGM_TIMEOUT); /* poll to avoid reading EOF */ + somecounter +=1; + + if (ret==0) { + printf("TIMEOUT %d\n",somecounter); + } + + if (pfd.revents & POLLIN){ + + printf("PostPoll %d %d, revents = %d, errno = %d\n",somecounter,ret,pfd.revents,errno); + + fl = read(cmd_file, cmdbuf, sizeof(cmdbuf)); + //clearerr(cmd_file); + + if (fl>0) { + writecounter +=1; + } + + if (fl<0) { + printf("Error?\n"); + break; + } + + + //fl = read(cmd_file,cmdbuf,sizeof(cmdbuf)); + if ((fl>10)||(somecounter>10000000)) { + break; + } + } + } + + printf("EXIT! errno=%d writes=%d wdc=%d\n",errno,writecounter,somecounter); + + return 0; +} diff --git a/src/camogm_fifo_writer.c b/src/camogm_fifo_writer.c new file mode 100644 index 0000000..219caed --- /dev/null +++ b/src/camogm_fifo_writer.c @@ -0,0 +1,36 @@ +/** @brief This define is needed to use lseek64 and should be set before includes */ +#define _LARGEFILE64_SOURCE +/** Needed for O_DIRECT */ +#define _GNU_SOURCE + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +int main(int argc, char *argv[]){ + + FILE *cmd_file; + const char pipe_name[] = "/tmp/fifo_test"; + const char message[] = "test"; + int ret; + + printf("This is writer\n"); + + cmd_file = fopen(pipe_name, "w"); + + while(true){ + ret = fwrite(message,sizeof(char),sizeof(message),cmd_file); + printf("Wrote %d bytes, fwrite returned %d\n",sizeof(message),ret); + } + + return 0; +} diff --git a/src/camogm_read.c b/src/camogm_read.c index ae50845..85b7772 100644 --- a/src/camogm_read.c +++ b/src/camogm_read.c @@ -1017,8 +1017,8 @@ void *reader(void *arg) .sockfd_const = &sockfd, .sockfd_temp = &fd }; - memset(&index_dir, 0, sizeof(struct disk_index)); - memset(&index_sparse, 0, sizeof(struct disk_index)); + memset(&index_dir, 0, sizeof(struct disk_idir)); + memset(&index_sparse, 0, sizeof(struct disk_idir)); prep_socket(&sockfd, state->sock_port); pthread_cleanup_push(exit_thread, &exit_state); -- 2.18.1