Commit 73ecbfc7 authored by Mikhail Karpenko's avatar Mikhail Karpenko

Start reading files in a separate thread

parent 824e2f22
......@@ -27,7 +27,7 @@ CFLAGS += -Wall -I$(ELPHEL_KERNEL_DIR)/include/elphel
all: $(PROGS)
$(PROGS): $(OBJS)
$(CC) $(LDFLAGS) $^ $(LDLIBS) -logg -o $@
$(CC) $(LDFLAGS) $^ $(LDLIBS) -logg -pthread -o $@
install: $(PROGS)
$(INSTALL) -d $(INSTDIR)
$(INSTALL) -m $(INSTMODE) -o $(INSTOWNER) -g $(INSTGROUP) $(PROGS) $(INSTDIR)
......
......@@ -87,6 +87,8 @@ camogm_state sstate;
/** @brief Memory mapped circular buffer arrays */
unsigned long * ccam_dma_buf[SENSOR_PORTS];
pthread_mutex_t print_mutex = PTHREAD_MUTEX_INITIALIZER;
/**
* @enum path_type
* @brief Define the path type passed to a function
......@@ -226,7 +228,8 @@ void camogm_init(camogm_state *state, char *pipe_name)
state->pipe_name = pipe_name;
state->rawdev.start_pos = RAWDEV_START_OFFSET;
state->rawdev.end_pos = state->rawdev.start_pos;
state->rawdev.curr_pos = state->rawdev.start_pos;
state->rawdev.curr_pos_w = state->rawdev.start_pos;
state->rawdev.curr_pos_r = state->rawdev.start_pos;
state->active_chn = ALL_CHN_ACTIVE;
}
......@@ -799,7 +802,7 @@ void camogm_set_prefix(camogm_state *state, const char * p, path_type type)
D0(fprintf(debug_file, "WARNING: raw device write initiated\n"));
state->rawdev_op = 1;
/* debug code follows */
state->rawdev.end_pos = 10485760; // 10 Mib
// state->rawdev.end_pos = 10485760; // 10 Mib
/* end of debug code */
}
}
......@@ -874,6 +877,7 @@ void camogm_set_start_after_timestamp(camogm_state *state, double d)
* @param[in] fn a pointer to a file name which will be used for output. Can be NULL or 'stdout' for
* output to stdout, 'stderr' for output to stderr and a file name for output to a file
* @param[in] xml flag indicating that the output should be in xml format
* @note access to state->rawdev.curr_pos_r is not locked in reading thread
* @return None
*/
void camogm_status(camogm_state *state, char * fn, int xml)
......@@ -888,6 +892,7 @@ void camogm_status(camogm_state *state, char * fn, int xml)
int _frames_skip = 0;
int _sec_skip = 0;
char *_kml_enable, *_kml_used, *_kml_height_mode;
unsigned int _percent_done;
_kml_enable = state->kml_enable ? "yes" : "no";
_kml_used = state->kml_used ? "yes" : "no";
......@@ -956,6 +961,10 @@ void camogm_status(camogm_state *state, char * fn, int xml)
"other"))) : "none";
_using_exif = state->exif ? "yes" : "no";
_using_global_pointer = state->save_gp ? "yes" : "no";
if (state->rawdev.curr_pos_r != 0 && state->rawdev.curr_pos_r > state->rawdev.start_pos)
_percent_done = 100 * state->rawdev.curr_pos_r / (state->rawdev.end_pos - state->rawdev.start_pos);
else
_percent_done = 0;
if (xml) {
fprintf(f, "<?xml version=\"1.0\"?>\n" \
......@@ -995,7 +1004,9 @@ void camogm_status(camogm_state *state, char * fn, int xml)
" <greedy>\"%s\"</greedy>\n" \
" <ignore_fps>\"%s\"</ignore_fps>\n" \
" <raw_device_path>\"%s\"</raw_device_path>\n" \
" <raw_device_overruns>%d</raw_device_overruns>\n",
" <raw_device_overruns>%d</raw_device_overruns>\n" \
" <raw_device_pos_write>0x%llx</raw_dev_pos_write>\n" \
" <raw_device_pos_read>0x%llx (%d\% done)</raw_device_pos_read>\n",
_state, state->path, state->frameno, state->start_after_timestamp, _dur, _udur, _len, \
_frames_skip, _sec_skip, \
state->width, state->height, _output_format, _using_exif, \
......@@ -1005,7 +1016,7 @@ void camogm_status(camogm_state *state, char * fn, int xml)
_kml_enable, _kml_used, state->kml_path, state->kml_horHalfFov, state->kml_vertHalfFov, state->kml_near, \
_kml_height_mode, state->kml_height, state->kml_period, state->kml_last_ts, state->kml_last_uts, \
state->greedy ? "yes" : "no", state->ignore_fps ? "yes" : "no", state->rawdev.rawdev_path,
state->rawdev.overrun);
state->rawdev.overrun, state->rawdev.curr_pos_w, state->rawdev.curr_pos_r, _percent_done);
FOR_EACH_PORT(int, chn) {
char *_active = is_chn_active(state, chn) ? "yes" : "no";
......@@ -1054,6 +1065,9 @@ void camogm_status(camogm_state *state, char * fn, int xml)
fprintf(f, "path prefix \t%s\n", state->path_prefix);
fprintf(f, "raw device path \t%s\n", state->rawdev.rawdev_path);
fprintf(f, "raw device overruns\t%d\n", state->rawdev.overrun);
fprintf(f, "raw write position \t0x%llx\n", state->rawdev.curr_pos_w);
fprintf(f, "raw read position \t0x%llx\n", state->rawdev.curr_pos_r);
fprintf(f, " percent done \t%d\%\n", _percent_done);
fprintf(f, "max file duration \t%d sec\n", state->segment_duration);
fprintf(f, "max file length \t%d B\n", state->segment_length);
fprintf(f, "max frames \t%d\n", state->max_frames);
......@@ -1321,6 +1335,13 @@ int parse_cmd(camogm_state *state, FILE* npipe)
D0(fprintf(debug_file, "Unable to switch state to 'reading' from current state. Check settings.\n"));
}
return 29;
} else if (strcmp(cmd, "rawdev_stop") == 0) {
if (state->prog_state == STATE_READING) {
state->rawdev.thread_state = STATE_CANCEL;
} else {
D0(fprintf(debug_file, "Reading thread is not running, nothing to stop\n"));
}
return 30;
}
return -1;
......@@ -1356,6 +1377,7 @@ void clean_up(camogm_state *state)
*/
int listener_loop(camogm_state *state)
{
void *tret;
FILE *cmd_file;
int rslt, ret, cmd, f_ok;
int fp0, fp1;
......@@ -1469,10 +1491,34 @@ int listener_loop(camogm_state *state)
exit(-1);
} // switch camogm_start()
if ((rslt != 0) && (rslt != CAMOGM_TOO_EARLY) && (rslt != CAMOGM_FRAME_NOT_READY) && (rslt != CAMOGM_FRAME_CHANGED) ) state->last_error_code = rslt;
} else if (state->prog_state == STATE_READING) {
build_index(state);
if (state->rawdev.thread_state == STATE_RUNNING) {
if (state->rawdev.thread_finished == true) {
state->rawdev.thread_state = STATE_STOPPED;
state->rawdev.thread_finished = false;
state->prog_state = STATE_STOPPED;
pthread_join(state->rawdev.tid, &tret);
if ((int)tret != 0 && (int)tret != PTHREAD_CANCELED) {
D0(fprintf(debug_file, "Reading thread returned error %d\n", (int)tret));
} else {
D3(fprintf(debug_file, "Reading thread stopped\n"));
}
} else {
usleep(COMMAND_LOOP_DELAY);
}
} else if (state->rawdev.thread_state == STATE_STOPPED) {
state->rawdev.thread_state = STATE_RUNNING;
state->rawdev.thread_finished = false;
if (pthread_create(&state->rawdev.tid, NULL, build_index, state) != 0) {
state->prog_state = STATE_STOPPED;
state->rawdev.thread_state = STATE_STOPPED;
D0(fprintf(debug_file, "%s:line %d: Can not start new thread, disk index is not built\n", __FILE__, __LINE__));
}
} else if (state->rawdev.thread_state == STATE_CANCEL) {
// cancel reading thread and return to running state waiting for the thread to terminate
state->rawdev.thread_state = STATE_RUNNING;
pthread_cancel(state->rawdev.tid);
}
} else { // not running, not starting
usleep(COMMAND_LOOP_DELAY); // make it longer but interruptible by signals?
}
......
......@@ -19,6 +19,8 @@
#ifndef _CAMOGM_H
#define _CAMOGM_H
#include <pthread.h>
#include <stdbool.h>
#include <exifa.h>
#include <c313a.h>
#include <ogg/ogg.h>
......@@ -41,13 +43,13 @@
#define CAMOGM_FORMAT_MOV 3 ///< output as Apple Quicktime
#define D(x) { if (debug_file && debug_level) { x; fflush(debug_file); } }
#define D0(x) { if (debug_file) { x; fflush(debug_file); } }
#define D1(x) { if (debug_file && (debug_level > 0)) { x; fflush(debug_file); } }
#define D2(x) { if (debug_file && (debug_level > 1)) { x; fflush(debug_file); } }
#define D3(x) { if (debug_file && (debug_level > 2)) { x; fflush(debug_file); } }
#define D4(x) { if (debug_file && (debug_level > 3)) { x; fflush(debug_file); } }
#define D5(x) { if (debug_file && (debug_level > 4)) { x; fflush(debug_file); } }
#define D6(x) { if (debug_file && (debug_level > 5)) { x; fflush(debug_file); } }
#define D0(x) { if (debug_file) { pthread_mutex_lock(&print_mutex); x; fflush(debug_file); pthread_mutex_unlock(&print_mutex); } }
#define D1(x) { if (debug_file && (debug_level > 0)) { pthread_mutex_lock(&print_mutex); x; fflush(debug_file); pthread_mutex_unlock(&print_mutex); } }
#define D2(x) { if (debug_file && (debug_level > 1)) { pthread_mutex_lock(&print_mutex); x; fflush(debug_file); pthread_mutex_unlock(&print_mutex); } }
#define D3(x) { if (debug_file && (debug_level > 2)) { pthread_mutex_lock(&print_mutex); x; fflush(debug_file); pthread_mutex_unlock(&print_mutex); } }
#define D4(x) { if (debug_file && (debug_level > 3)) { pthread_mutex_lock(&print_mutex); x; fflush(debug_file); pthread_mutex_unlock(&print_mutex); } }
#define D5(x) { if (debug_file && (debug_level > 4)) { pthread_mutex_lock(&print_mutex); x; fflush(debug_file); pthread_mutex_unlock(&print_mutex); } }
#define D6(x) { if (debug_file && (debug_level > 5)) { pthread_mutex_lock(&print_mutex); x; fflush(debug_file); pthread_mutex_unlock(&print_mutex); } }
//#define DD(x)
#define DD(x) { if (debug_file) { fprintf(debug_file, "%s:%d:", __FILE__, __LINE__); x; fflush(debug_file); } }
......@@ -67,8 +69,10 @@ enum state_flags {
STATE_STOPPED,
STATE_STARTING,
STATE_RUNNING,
STATE_READING
STATE_READING,
STATE_CANCEL
};
/**
* @struct rawdev_buffer
* @brief Holds pointers related to raw device buffer operation
......@@ -94,8 +98,12 @@ typedef struct {
uint32_t overrun;
uint64_t start_pos;
uint64_t end_pos;
uint64_t curr_pos;
uint64_t curr_pos_w;
volatile uint64_t curr_pos_r;
uint64_t file_start;
pthread_t tid;
int thread_state;
volatile bool thread_finished;
} rawdev_buffer;
/**
......@@ -190,6 +198,7 @@ typedef struct {
extern int debug_level;
extern FILE* debug_file;
extern pthread_mutex_t print_mutex;
void put_uint16(void *buf, u_int16_t val);
void put_uint32(void *buf, u_int32_t val);
......
......@@ -76,8 +76,8 @@ int camogm_start_jpeg(camogm_state *state)
return -CAMOGM_FRAME_FILE_ERR;
}
D3(fprintf(debug_file, "Open raw device %s; start_pos = %llu, end_pos = %llu, curr_pos = %llu\n", state->rawdev.rawdev_path,
state->rawdev.start_pos, state->rawdev.end_pos, state->rawdev.curr_pos));
lseek64(state->rawdev.rawdev_fd, state->rawdev.curr_pos, SEEK_SET);
state->rawdev.start_pos, state->rawdev.end_pos, state->rawdev.curr_pos_w));
lseek64(state->rawdev.rawdev_fd, state->rawdev.curr_pos_w, SEEK_SET);
}
}
......@@ -123,11 +123,11 @@ int camogm_frame_jpeg(camogm_state *state)
close(state->ivf);
} else {
D0(fprintf(debug_file, "\n%s: current pointers start_pos = %llu, end_pos = %llu, curr_pos = %llu, data in buffer %d\n", __func__,
state->rawdev.start_pos, state->rawdev.end_pos, state->rawdev.curr_pos, l));
state->rawdev.start_pos, state->rawdev.end_pos, state->rawdev.curr_pos_w, l));
split_index = -1;
for (int i = 0, total_len = 0; i < state->chunk_index - 1; i++) {
total_len += state->packetchunks[i + 1].bytes;
if (total_len + state->rawdev.curr_pos > state->rawdev.end_pos) {
if (total_len + state->rawdev.curr_pos_w > state->rawdev.end_pos) {
split_index = i;
chunks_used++;
D0(fprintf(debug_file, "\n>>> raw storage roll over detected\n"));
......@@ -141,7 +141,7 @@ int camogm_frame_jpeg(camogm_state *state)
if (i == split_index) {
// one of the chunks rolls over the end of the raw storage, split it into two segments and
// use additional chunk in chunks_iovec for this additional segment
split_cntr = state->rawdev.end_pos - (l + state->rawdev.curr_pos);
split_cntr = state->rawdev.end_pos - (l + state->rawdev.curr_pos_w);
split_ptr = state->packetchunks[k].chunk + split_cntr;
D3(fprintf(debug_file, "Splitting chunk #%d: total chunk size = %ld, start address = 0x%p\n",
......@@ -191,10 +191,10 @@ int camogm_frame_jpeg(camogm_state *state)
D0(fprintf(debug_file, "writev error %d (returned %d, expected %d)\n", j, iovlen, l));
return -CAMOGM_FRAME_FILE_ERR;
}
state->rawdev.curr_pos += l;
if (state->rawdev.curr_pos > state->rawdev.end_pos)
state->rawdev.curr_pos = state->rawdev.curr_pos - state->rawdev.end_pos + state->rawdev.start_pos;
D0(fprintf(debug_file, "%d bytes written, curr_pos = %llu\n", l, state->rawdev.curr_pos));
state->rawdev.curr_pos_w += l;
if (state->rawdev.curr_pos_w > state->rawdev.end_pos)
state->rawdev.curr_pos_w = state->rawdev.curr_pos_w - state->rawdev.end_pos + state->rawdev.start_pos;
D0(fprintf(debug_file, "%d bytes written, curr_pos = %llu\n", l, state->rawdev.curr_pos_w));
}
return 0;
......
......@@ -214,6 +214,11 @@ struct crb_ptrs {
struct iovec second_buff;
};
struct exit_state {
camogm_state *state;
int ret_val;
};
void dump_index_dir(const struct disk_idir *idir)
{
struct disk_index *ind = idir->head;
......@@ -680,6 +685,18 @@ static int write_buffer(struct file_opts *f_op, unsigned char *from, unsigned ch
return ret;
}
static inline void *exit_thread(void *arg)
{
struct exit_state *s = (struct exit_state *)arg;
if (s->state->rawdev.rawdev_fd > 0) {
close(s->state->rawdev.rawdev_fd);
s->state->rawdev.rawdev_fd = -1;
}
s->state->rawdev.thread_finished = true;
return (void *) s->ret_val;
}
/**
* @brief Extract JPEG files from raw device buffer
*
......@@ -687,9 +704,11 @@ static int write_buffer(struct file_opts *f_op, unsigned char *from, unsigned ch
* analyzed for JPEG markers and then the data from buffer is written to a file.
* @param[in] state a pointer to a structure containing current state
* @return 0 if files were extracted successfully and negative error code otherwise
* @warning The main processing loop of the function is enclosed in @e pthread_cleanup_push and @e pthread_cleanup_pop
* calls. The effect of use of normal @b return or @b break to prematurely leave this loop is undefined.
* @todo update description, reorder decision tree
*/
int build_index(camogm_state *state)
void *build_index(void *arg)
{
const int include_markers = INCLUDE_MARKERS;
int process;
......@@ -705,21 +724,22 @@ int build_index(camogm_state *state)
unsigned char *active_buff = buff;
unsigned char *save_from = NULL;
unsigned char *save_to = NULL;
// struct file_opts f_opts;
uint64_t dev_curr_pos = 0;
uint64_t include_st_marker, include_en_marker;
struct disk_idir index_dir = {0};
size_t add_stm_len, add_enm_len;
camogm_state *state = arg;
struct exit_state exit_state = {
.state = state,
.ret_val = 0
};
// memset(&f_opts, 0, sizeof(struct file_opts));
// f_opts.file_state = WRITE_STOP;
// f_opts.state = state;
//
state->rawdev.rawdev_fd = open(state->rawdev.rawdev_path, O_RDONLY);
if (state->rawdev.rawdev_fd < 0) {
D0(perror(__func__));
D0(fprintf(debug_file, "Error opening raw device %s\n", state->rawdev.rawdev_path));
return -CAMOGM_FRAME_FILE_ERR;
exit_state.ret_val = -CAMOGM_FRAME_FILE_ERR;
exit_thread(&exit_state);
}
if (include_markers) {
......@@ -738,6 +758,7 @@ int build_index(camogm_state *state)
zero_cross = 0;
search_state = SEARCH_SKIP;
idir_result = 0;
pthread_cleanup_push(exit_thread, &exit_state);
while (process) {
rd = read(state->rawdev.rawdev_fd, buff, sizeof(buff));
err = errno;
......@@ -813,7 +834,6 @@ int build_index(camogm_state *state)
// normal condition, start marker following stop marker found - this indicates a new file
uint64_t disk_pos = dev_curr_pos + pos_stop + (save_from - active_buff);
idir_result = stop_index(&index_dir, disk_pos);
dump_index_dir(&index_dir);
if (zero_cross == 0) {
state->rawdev.file_start = dev_curr_pos + pos_start + (save_from - active_buff);
idir_result = save_index(state, &index_dir);
......@@ -893,18 +913,23 @@ int build_index(camogm_state *state)
process = 0;
}
dev_curr_pos += rd;
state->rawdev.curr_pos_r = dev_curr_pos;
}
}
pthread_cleanup_pop(0);
D0(fprintf(debug_file, "\n%d files read from %s\n", index_dir.size, state->rawdev.rawdev_path));
if (close(state->rawdev.rawdev_fd) != 0) {
perror("Unable to close raw device: ");
return -CAMOGM_FRAME_FILE_ERR;
exit_state.ret_val = -CAMOGM_FRAME_FILE_ERR;
exit_thread(&exit_state);
}
state->rawdev.rawdev_fd = -1;
dump_index_dir(&index_dir);
delete_idir(&index_dir);
return 0;
exit_state.ret_val = 0;
exit_thread(&exit_state);
}
......@@ -35,7 +35,6 @@ struct disk_idir {
size_t size;
};
//int camogm_read(camogm_state *state);
int build_index(camogm_state *state);
void *build_index(void *arg);
#endif /* _CAMOGM_READ_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment