Commit 6cffcbe2 authored by Andrey Filippov's avatar Andrey Filippov

Almost keeps up with recording

parent b4d4414a
......@@ -284,7 +284,7 @@ void camogm_init(camogm_state *state, char *pipe_name, uint16_t port_num)
// state->writer_params.data_ready = false;
state->writer_params.chunk_page_prep = 0; // will prepare page 0
state->writer_params.chunk_page_write = 0; // will write page 0
state->writer_params.writev_run = false;
state->writer_params.exit_thread = false;
......@@ -1695,7 +1695,12 @@ int listener_loop(camogm_state *state)
case CAMOGM_FRAME_NOT_READY: // just wait for the frame to appear at the current pointer
// we'll wait for a frame, not to waste resources. But if the compressor is stopped this program will not respond to any commands
// TODO - add another wait with (short) timeout?
fp0 = lseek(state->fd_circ[curr_port], 0, SEEK_CUR);
wait_frame_sync(state->fd_fparmsall[curr_port]);
D6(fprintf(debug_file, "No compressed frame ready, waited for the next frame sync for port %d @ %06d\n",curr_port,get_fpga_usec(state->fd_fparmsall[0], 0)));
break;
// do not analyze next frame (may be non-existing), will do that next time
/*
fp0 = lseek(state->fd_circ[curr_port], 0, SEEK_CUR); // need to skip uncompressed frame, LWIR16 turns compressor off
if (fp0 < 0) {
D0(fprintf(debug_file, "%s:line %d got broken frame (%d) before waiting for ready\n", __FILE__, __LINE__, fp0));
rslt = CAMOGM_FRAME_BROKEN;
......@@ -1709,6 +1714,7 @@ int listener_loop(camogm_state *state)
break;
}
}
*/
// no break
case CAMOGM_FRAME_CHANGED: // frame parameters have changed
case CAMOGM_FRAME_NEXTFILE: // next file needed (need to switch to a new file (time/size exceeded limit)
......@@ -2294,3 +2300,14 @@ inline int get_fpga_usec(const int fd_fparsall, unsigned int port)
// return (int) getGPValue(port, G_MICROSECONDS);
return (int) GLOBALPARS(port, G_MICROSECONDS);
}
/**
* @brief Wait for frame sync
*/
inline void wait_frame_sync(const int fd_fparsall)
{
lseek(fd_fparsall, LSEEK_FRAME_WAIT_REL + 1, SEEK_END);
}
......@@ -68,7 +68,7 @@
/** @brief File can be split up to this number of chunks */
#define FILE_CHUNKS_NUM 8
/** @brief Number of chunk pages to overlap preparation with SSD raw write */
#define FILE_CHUNKS_PAGES 2
#define FILE_CHUNKS_PAGES 4 // 2
#define FILE_CHUNKS_TOTAL ((FILE_CHUNKS_NUM) * (FILE_CHUNKS_PAGES))
/**
* @enum state_flags
......@@ -153,6 +153,7 @@ struct writer_params {
// ///< For overlapping disk raw writes with chunks preparation (should be atomic - use #writer_mutex)
int chunk_page_prep; ///< page of chunks being prepared. Incremented (mod) after data is prepared for raw write
int chunk_page_write; ///< page of chunks to write. Incremented (mod) after recording to disk
bool writev_run; ///< writev() is active (enable main thread if buffer not full and writew or buf empty)
int last_ret_val; ///< error value return during last frame recording (if any occurred)
bool exit_thread; ///< flag indicating that the writing thread should terminate
int state; ///< the state of disk writing thread
......@@ -282,5 +283,6 @@ int waitDaemonEnabled(unsigned int port, int daemonBit);
int isDaemonEnabled(unsigned int port, int daemonBit);
int is_fd_valid(int fd);
int get_fpga_usec(const int fd_fparsall, unsigned int port);
inline void wait_frame_sync(const int fd_fparsall);
#endif /* _CAMOGM_H */
......@@ -546,6 +546,7 @@ void align_frame(camogm_state *state)
if (len >= (chunks[paged_CHUNK_DATA_1].iov_len + chunks[paged_CHUNK_TRAILER].iov_len)) {
/* current frame is not split or the second part of JPEG data is too short */
data_len = len - chunks[paged_CHUNK_DATA_1].iov_len - chunks[paged_CHUNK_TRAILER].iov_len;
D6(fprintf(debug_file, "_algn002: data_len=%d, copy %d short JPEG @ %06d\n", data_len, chunks[paged_CHUNK_DATA_1].iov_len, get_fpga_usec(state->fd_fparmsall[0], 0)));
src = vectrpos(&chunks[paged_CHUNK_DATA_0], data_len);
vectcpy(chunk_rem, src, data_len);
vectshrink(&chunks[paged_CHUNK_DATA_0], data_len);
......@@ -556,6 +557,7 @@ void align_frame(camogm_state *state)
} else if (len >= chunks[paged_CHUNK_TRAILER].iov_len) {
/* there is enough data in second part to align the frame */
data_len = len - chunks[paged_CHUNK_TRAILER].iov_len;
D6(fprintf(debug_file, "_algn003: data_len=%d, copy %d short JPEG @ %06d\n", data_len, chunks[paged_CHUNK_DATA_1].iov_len, get_fpga_usec(state->fd_fparmsall[0], 0)));
src = vectrpos(&chunks[paged_CHUNK_DATA_1], data_len);
vectcpy(chunk_rem, src, data_len);
vectshrink(&chunks[paged_CHUNK_DATA_1], data_len);
......@@ -567,6 +569,7 @@ void align_frame(camogm_state *state)
data_len = PHY_BLOCK_SIZE - (chunks[paged_CHUNK_TRAILER].iov_len - len);
if (data_len >= chunks[paged_CHUNK_DATA_1].iov_len) {
size_t cut_len = data_len - chunks[paged_CHUNK_DATA_1].iov_len;
D6(fprintf(debug_file, "_algn004: cut_len=%d, copy %d short JPEG @ %06d\n", cut_len, chunks[paged_CHUNK_DATA_1].iov_len, get_fpga_usec(state->fd_fparmsall[0], 0)));
src = vectrpos(&chunks[paged_CHUNK_DATA_0], cut_len);
vectcpy(chunk_rem, src, cut_len);
vectshrink(&chunks[paged_CHUNK_DATA_0], cut_len);
......@@ -575,6 +578,7 @@ void align_frame(camogm_state *state)
vectcpy(chunk_rem, chunks[paged_CHUNK_TRAILER].iov_base, chunks[paged_CHUNK_TRAILER].iov_len);
vectshrink(&chunks[paged_CHUNK_TRAILER], chunks[paged_CHUNK_TRAILER].iov_len);
} else {
D6(fprintf(debug_file, "_algn005: data_len=%d, @ %06d\n", data_len, get_fpga_usec(state->fd_fparmsall[0], 0)));
src = vectrpos(&chunks[paged_CHUNK_DATA_1], data_len);
vectcpy(chunk_rem, src, data_len);
vectshrink(&chunks[paged_CHUNK_DATA_1], data_len);
......@@ -588,11 +592,13 @@ void align_frame(camogm_state *state)
chunks[paged_CHUNK_ALIGN].iov_len = 0;
if (chunks[paged_CHUNK_DATA_1].iov_len == 0) {
data_len = chunks[paged_CHUNK_DATA_0].iov_len % ALIGNMENT_SIZE;
D6(fprintf(debug_file, "_algn006: data_len=%d, @ %06d\n", data_len, get_fpga_usec(state->fd_fparmsall[0], 0)));
src = vectrpos(&chunks[CHUNK_DATA_0], data_len);
vectcpy(&chunks[paged_CHUNK_ALIGN], src, data_len);
vectshrink(&chunks[paged_CHUNK_DATA_0], data_len);
} else {
data_len = chunks[paged_CHUNK_DATA_1].iov_len % ALIGNMENT_SIZE;
D6(fprintf(debug_file, "_algn007: data_len=%d, @ %06d\n", data_len, get_fpga_usec(state->fd_fparmsall[0], 0)));
src = vectrpos(&chunks[paged_CHUNK_DATA_1], data_len);
vectcpy(&chunks[paged_CHUNK_ALIGN], src, data_len);
vectshrink(&chunks[paged_CHUNK_DATA_1], data_len);
......
......@@ -323,33 +323,31 @@ int camogm_frame_jpeg(camogm_state *state)
for (int i = 0; i < state->chunk_index - 1; i++) {
D6(fprintf(debug_file, "ptr: %p, length: %ld\n", state->packetchunks[i + 1].chunk, state->packetchunks[i + 1].bytes));
}
D6(fprintf(debug_file, "waiting for empty buffer @ %06d\n",get_fpga_usec(state->fd_fparmsall[0], 0)));
// next frame is ready for recording, signal this to the writer thread
D6(fprintf(debug_file, "_13__: Yielding thread before getting a lock (%d, %d):%d @ %06d\n", state->writer_params.chunk_page_prep, state->writer_params.chunk_page_write, state->writer_params.writev_run, get_fpga_usec(state->fd_fparmsall[0], 0)));
pthread_yield();
pthread_mutex_lock(&state->writer_params.writer_mutex);
// while (state->writer_params.data_ready)
// pthread_cond_wait(&state->writer_params.main_cond, &state->writer_params.writer_mutex);
D6(fprintf(debug_file, "_13a_: waiting for empty buffer (%d, %d):%d @ %06d\n", state->writer_params.chunk_page_prep, state->writer_params.chunk_page_write, state->writer_params.writev_run, get_fpga_usec(state->fd_fparmsall[0], 0)));
// do not run ahead by >= FILE_CHUNKS_PAGES (0 or 1 for just FILE_CHUNKS_PAGES == 2)
while ((state->writer_params.chunk_page_prep - state->writer_params.chunk_page_write + (2 * FILE_CHUNKS_PAGES)) % (2 * FILE_CHUNKS_PAGES) >= FILE_CHUNKS_PAGES)
// either buffer is completely empty (state->writer_params->chunk_page_prep == state->writer_params->chunk_page_write)
// or it is not full and writev_run is active
while (((state->writer_params.chunk_page_prep - state->writer_params.chunk_page_write + (2 * FILE_CHUNKS_PAGES))
% (2 * FILE_CHUNKS_PAGES) >= FILE_CHUNKS_PAGES) ||
(!state->writer_params.writev_run && (state->writer_params.chunk_page_prep != state->writer_params.chunk_page_write))) {
D6(fprintf(debug_file, "_13b_: waiting for empty buffer (%d, %d):%d @ %06d\n", state->writer_params.chunk_page_prep, state->writer_params.chunk_page_write, state->writer_params.writev_run, get_fpga_usec(state->fd_fparmsall[0], 0)));
pthread_cond_wait(&state->writer_params.main_cond, &state->writer_params.writer_mutex);
}
pthread_mutex_unlock(&state->writer_params.writer_mutex);
D6(fprintf(debug_file, "_13a_: got empty buffer @ %06d\n",get_fpga_usec(state->fd_fparmsall[0], 0)));
D6(fprintf(debug_file, "_13c_: got empty buffer (%d, %d):%d @ %06d\n", state->writer_params.chunk_page_prep, state->writer_params.chunk_page_write, state->writer_params.writev_run, get_fpga_usec(state->fd_fparmsall[0], 0)));
align_frame(state);
/* // Moved to write thread
if (update_lba(state) == 1) {
D0(fprintf(debug_file, "The end of block device reached, continue recording from start\n"));
lseek(state->writer_params.blockdev_fd, 0, SEEK_SET);
}
D6(fprintf(debug_file, "Block device positions: start = %llu, current = %llu, end = %llu\n",
state->writer_params.lba_start, state->writer_params.lba_current, state->writer_params.lba_end));
*/
// proceed if last frame was recorded without errors
D6(fprintf(debug_file, "_13a1_:frame aligned @ %06d\n",get_fpga_usec(state->fd_fparmsall[0], 0)));
D6(fprintf(debug_file, "_13d_:frame aligned @ %06d\n",get_fpga_usec(state->fd_fparmsall[0], 0)));
pthread_mutex_lock(&state->writer_params.writer_mutex);
if (state->writer_params.last_ret_val == 0) {
state->writer_params.chunk_page_prep = (state->writer_params.chunk_page_prep + 1) % (2 * FILE_CHUNKS_PAGES);
// state->writer_params.data_ready = true;
pthread_cond_signal(&state->writer_params.writer_cond);
D6(fprintf(debug_file, "_13b_: chunk_page_prep = %d (%d), signal writer_cond @ %06d\n",state->writer_params.chunk_page_prep, state->writer_params.chunk_page_write, get_fpga_usec(state->fd_fparmsall[0], 0)));
D6(fprintf(debug_file, "_13e_: chunk_page_prep = %d (%d), signal writer_cond @ %06d\n",state->writer_params.chunk_page_prep, state->writer_params.chunk_page_write, get_fpga_usec(state->fd_fparmsall[0], 0)));
}
pthread_mutex_unlock(&state->writer_params.writer_mutex);
if (state->writer_params.last_ret_val != 0) {
......@@ -359,7 +357,7 @@ int camogm_frame_jpeg(camogm_state *state)
curr_time = time(NULL);
if (difftime(curr_time, state->writer_params.stat_update) > STAT_UPDATE_PERIOD) {
save_state_file(state);
D6(fprintf(debug_file, "_13c_: saved state file @ %06d\n", get_fpga_usec(state->fd_fparmsall[0], 0)));
D6(fprintf(debug_file, "_13f_: saved state file @ %06d\n", get_fpga_usec(state->fd_fparmsall[0], 0)));
}
}
......@@ -439,7 +437,8 @@ void *jpeg_writer(void *thread_args)
unsigned char dummy_buff[PHY_BLOCK_SIZE];
memset((void *)chunks_iovec, 0, sizeof(struct iovec) * FILE_CHUNKS_PAGES * FILE_CHUNKS_NUM);
// pthread_mutex_lock(&params->writer_mutex);
// lock, will only allow main thread while a) waiting for write buffer, and b) waiting for writev
//// pthread_mutex_lock(&params->writer_mutex);
params->state = STATE_RUNNING;
D6(fprintf(debug_file, "_w000_: Before jpeg_writer loop @ %06d\n", get_fpga_usec(state->fd_fparmsall[0], 0))); // before the debug file is set !
while (process) {
......@@ -497,7 +496,10 @@ void *jpeg_writer(void *thread_args)
state->writer_params.lba_current += blocks_write;
// D6(fprintf(debug_file, "_w02_: starting writev @ %06d\n",get_fpga_usec(state->fd_fparmsall[0], 0)));
D6(dbg_us1=get_fpga_usec(state->fd_fparmsall[0], 0); fprintf(debug_file, "_w02_: starting writev @ %06d\n",dbg_us1));
state->writer_params.writev_run = true;
pthread_cond_signal(&params->main_cond);
iovlen = writev(state->writer_params.blockdev_fd, chunks_iovec[wpage], chunk_index);
state->writer_params.writev_run = false;
// D6(fprintf(debug_file, "_w03_: finished writev @ %06d\n",get_fpga_usec(state->fd_fparmsall[0], 0)));
D6(dbg_us=get_fpga_usec(state->fd_fparmsall[0], 0); fprintf(debug_file, "_w03_: finished writev @ %06d (+ %06d, +%06d)\n",dbg_us, (dbg_us-dbg_us1), (dbg_us-dbg_us2)); dbg_us2=dbg_us);
if (iovlen < l) {
......@@ -535,7 +537,10 @@ void *jpeg_writer(void *thread_args)
}
// write the first group (before rollover)
state->writer_params.lba_current = state->writer_params.lba_start;
state->writer_params.writev_run = true;
pthread_cond_signal(&params->main_cond);
iovlen = writev(state->writer_params.blockdev_fd, chunks_iovec[wpage], i0);
state->writer_params.writev_run = false;
lseek(state->writer_params.blockdev_fd, 0, SEEK_SET); // regardless of error - rewind
if (iovlen < l0) {
D0(fprintf(debug_file, "(1 of 2) writev error: %s (returned %i, expected %i)\n", strerror(errno), iovlen, l0));
......@@ -555,7 +560,11 @@ void *jpeg_writer(void *thread_args)
for (i0 = i1; i0 < chunk_index; i0++){
chunks_iovec[wpage][i0 - i1] = chunks_iovec[wpage][i0];
}
state->writer_params.writev_run = true;
pthread_cond_signal(&params->main_cond);
iovlen = writev(state->writer_params.blockdev_fd, chunks_iovec[wpage], chunk_index - i1);
state->writer_params.writev_run = false;
state->writer_params.lba_current = state->writer_params.lba_start + l1 / PHY_BLOCK_SIZE;
if (iovlen < l1) {
D0(fprintf(debug_file, "(2 of 2) writev error: %s (returned %i, expected %i)\n", strerror(errno), iovlen, l1));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment