Commit fe15d689 authored by Andrey Filippov's avatar Andrey Filippov

added single write thread option

parent 67643efd
......@@ -364,16 +364,16 @@ int camogm_start(camogm_state *state, bool restart)
next_state_path(state); // increment path so next time will use different one.
} else {
if (state->prog_state == STATE_STARTING) {
D1(fprintf(debug_file, "Continue starting, state->path='%s'\n", state->path));
D1(syslog (LOG_INFO, "Continue starting, state->path='%s'\n", state->path));
D0(fprintf(debug_file, "Continue starting, state->path='%s'\n", state->path));
D0(syslog (LOG_INFO, "Continue starting, state->path='%s'\n", state->path));
} else {
D1(fprintf(debug_file, "Re-starting raw data recording, using the same state file, next will be %s, state->path='%s'\n", get_state_path(state), state->path));
D1(syslog (LOG_INFO, "Re-starting raw data recording, next will be %s", get_state_path(state)));
D0(fprintf(debug_file, "Re-starting raw data recording, using the same state file, next will be %s, state->path='%s'\n", get_state_path(state), state->path));
D0(syslog (LOG_INFO, "Re-starting raw data recording, next will be %s", get_state_path(state)));
}
}
} else {
D1(fprintf(debug_file, "Starting file system recording to %s\n", state->path));
D1(syslog (LOG_INFO, "Starting file system recording to %s", state->path));
D0(fprintf(debug_file, "Starting file system recording to %s\n", state->path));
D0(syslog (LOG_INFO, "Starting file system recording to %s", state->path));
}
double dtime_stamp;
......@@ -409,14 +409,23 @@ int camogm_start(camogm_state *state, bool restart)
pthread_mutex_unlock(&state->mutex);
D6(fprintf(debug_file, "Starting recording-1\n"));
#ifdef MIN_USED_SIZE
state->greedy = 1;
state->greedy = 1; // not used
#endif
FOR_EACH_PORT(int, chn) {
if (is_chn_active(state, chn)) {
// Check/set circbuf read pointer
D3(fprintf(debug_file, "1: state->cirbuf_rp=0x%x\n", state->cirbuf_rp[chn]));
D3(fprintf(debug_file, "1a: compressed frame number = %li\n", lseek(state->fd_circ[chn], LSEEK_CIRC_GETFRAME, SEEK_END)));
if ((state->cirbuf_rp[chn] < 0) || (lseek(state->fd_circ[chn], state->cirbuf_rp[chn], SEEK_SET) < 0) || (lseek(state->fd_circ[chn], LSEEK_CIRC_VALID, SEEK_END) < 0 )) {
D3(fprintf(debug_file, "1a: compressed frame number = %li\n", lseek(state->fd_circ[chn], LSEEK_CIRC_GETFRAME, SEEK_END)));
if ( (restart) || // restart - always flush buffer (will loose frames, but not continue broken frames)
(state->cirbuf_rp[chn] < 0) ||
(lseek(state->fd_circ[chn], state->cirbuf_rp[chn], SEEK_SET) < 0) ||
(lseek(state->fd_circ[chn], LSEEK_CIRC_VALID, SEEK_END) < 0 )) {
D0(fprintf(debug_file, "Flushing buffer and rebuilding new\n"));
D0(syslog (LOG_INFO, "Flushing buffer and rebuilding new"));
// reset bufer
D3(fprintf(debug_file, "2: state->cirbuf_rp=0x%x\n", state->cirbuf_rp[chn]));
/* In "greedy" mode try to save as many frames from the circbuf as possible */
// state->cirbuf_rp[chn] = lseek(state->fd_circ[chn], state->greedy ? LSEEK_CIRC_SCND : LSEEK_CIRC_LAST, SEEK_END);
......@@ -442,13 +451,17 @@ int camogm_start(camogm_state *state, bool restart)
// file pointer here should match state->rp; so no need to do lseek(state->fd_circ,state->cirbuf_rp,SEEK_SET);
state->buf_min[chn] = getGPValue(chn, G_FREECIRCBUF);
#endif
} else {
} else { // reuse the same buffer
#if ERRORS_FROM_START == 0
if (state->buf_min[chn] > getGPValue(chn, G_FREECIRCBUF)) {
state->buf_min[chn] = getGPValue(chn, G_FREECIRCBUF);
}
#endif
}
lseek(state->fd_circ[chn], state->cirbuf_rp[chn], SEEK_SET); // just in case - restore pointer
#if ERRORS_FROM_START
if (!restart) { // only reset errors and minimal buffer size during slient-initiated start (not after buffer overrun)
state->buf_mmin = state->circ_buff_size[0] - MIN_USED_SIZE;;
......@@ -464,6 +477,8 @@ int camogm_start(camogm_state *state, bool restart)
} else { // restart only after buffer overrun
if (state->prog_state != STATE_STARTING) { // during STATE_STARTING it continuously gets here if
state->error_restarts ++;
D0(fprintf(debug_file, "Incremented restart errors = %d\n", state->error_restarts));
D0(syslog (LOG_INFO, "Incremented restart errors = %d", state->error_restarts));
}
}
#endif
......@@ -906,7 +921,7 @@ int camogm_stop(camogm_state *state)
{
int rslt = 0;
if (state->prog_state != STATE_RUNNING) {
if ((state->prog_state != STATE_RUNNING) && (state->prog_state != STATE_RESTARTING)) {
if (state->prog_state != STATE_STARTING) {
D2(fprintf(debug_file, "Recording was not running, nothing to stop\n"));
} else {
......@@ -918,6 +933,8 @@ int camogm_stop(camogm_state *state)
return 0;
}
D1(fprintf(debug_file, "Ending recording\n"));
D0(syslog (LOG_INFO,"%s:line %d - Ending recording, state = %d", __FILE__, __LINE__, state->prog_state));
if (state->kml_used) camogm_end_kml(state);
switch (state->format) {
case CAMOGM_FORMAT_NONE: rslt = 0; break;
......@@ -932,7 +949,9 @@ int camogm_stop(camogm_state *state)
if (rslt) return rslt;
state->last = 1;
pthread_mutex_lock(&state->mutex);
state->prog_state = STATE_STOPPED;
if (state->prog_state != STATE_RESTARTING) {
state->prog_state = STATE_STOPPED;
}
pthread_mutex_unlock(&state->mutex);
D1(fprintf(debug_file, "Ended recording\n"));
DFLUSH;
......@@ -1894,6 +1913,7 @@ int listener_loop(camogm_state *state)
state->writer_params.circbuf_many = false; // will be set when main (preparing) thread is ahead of the writer threads by more than NUM_NEED_NEMPTY
if ((rslt=get_write_page(&state->writer_params) > 0)) { // still some prepared pages left
pthread_cond_signal(&state->writer_params.writer_cond); // wake up other writer threads?
#if NUM_WRITER_THREADS > 1
D3(fprintf(debug_file, "listener_loop(): some prepared pages remain (%d)[%d%d:%d%d] MANY=%d EMPTY=%d BUSY=%d [%d%d%d%d%d%d%d%d], SIGNALLED @ %07d\n",\
rslt, \
state->writer_params.writev_run[0],state->writer_params.writev_run[1], \
......@@ -1902,6 +1922,16 @@ int listener_loop(camogm_state *state)
state->writer_params.chunk_page_state[0],state->writer_params.chunk_page_state[1],state->writer_params.chunk_page_state[2],state->writer_params.chunk_page_state[3], \
state->writer_params.chunk_page_state[4],state->writer_params.chunk_page_state[5],state->writer_params.chunk_page_state[6],state->writer_params.chunk_page_state[7], \
get_fpga_usec(state->fd_fparmsall[0], 0)));
#else
D3(fprintf(debug_file, "listener_loop(): some prepared pages remain (%d)[%d:%d] MANY=%d EMPTY=%d BUSY=%d [%d%d%d%d%d%d%d%d], SIGNALLED @ %07d\n",\
rslt, \
state->writer_params.writev_run[0], \
state->writer_params.write_waits_sig[0], \
state->writer_params.circbuf_many, get_num_empty(&state->writer_params), get_num_busy(&state->writer_params), \
state->writer_params.chunk_page_state[0],state->writer_params.chunk_page_state[1],state->writer_params.chunk_page_state[2],state->writer_params.chunk_page_state[3], \
state->writer_params.chunk_page_state[4],state->writer_params.chunk_page_state[5],state->writer_params.chunk_page_state[6],state->writer_params.chunk_page_state[7], \
get_fpga_usec(state->fd_fparmsall[0], 0)));
#endif
}
pthread_mutex_unlock(&state->writer_params.writer_mutex);
......
......@@ -30,7 +30,7 @@
#include <syslog.h>
#include <malloc.h> // debugging
#define NUM_WRITER_THREADS 2
#define NUM_WRITER_THREADS 1+0 // 2
#define NUM_NEED_EMPTY (NUM_WRITER_THREADS + 1) // run main thread to prepage more pages if >= empty
#define NUM_NEED_NEMPTY (NUM_WRITER_THREADS + 0) // wake up main thread if there are at least this number of empty pages
#define NUM_FREE_MANY (NUM_WRITER_THREADS + 1) // Set many if get_num_empty() < NUM_FREE_MANY
......@@ -53,7 +53,7 @@
#define CAMOGM_FORMAT_OGM 1 ///< output as Ogg Media file
#define CAMOGM_FORMAT_JPEG 2 ///< output as individual JPEG files
#define CAMOGM_FORMAT_MOV 3 ///< output as Apple Quicktime
#define CAMOGM_MIN_BUF_FRAMES 2 ///< buffer should accomodate at list this number of frames
#define CAMOGM_MIN_BUF_FRAMES 4 ///< buffer should accommodate at least this number of frames
#define FLUSH_DEBUG 1 ///< flush debug file after each print
#define USE_OGG_PACKET_TYPE 0 /// do not prepend image with packet type, such as PACKET_IS_SYNCPOINT
......
......@@ -759,15 +759,22 @@ int camogm_frame_jpeg(camogm_state *state)
D6(fprintf(debug_file, "ptr: %p, length: %ld\n", state->packetchunks[i].chunk, state->packetchunks[i].bytes));
}
// next frame is ready for recording, signal this to the writer thread
#if NUM_WRITER_THREADS > 1
D6(fprintf(debug_file, "_13_: Yielding thread before getting a lock (%d):[%d%d:%d%d] @ %07d\n",\
params->chunk_page_prep, params->writev_run[0], params->writev_run[1], \
params->write_waits_sig[0], params->write_waits_sig[1], get_fpga_usec(state->fd_fparmsall[0], 0)));
#else
D6(fprintf(debug_file, "_13_: Yielding thread before getting a lock (%d):[%d :%d ] @ %07d\n",\
params->chunk_page_prep, params->writev_run[0], \
params->write_waits_sig[0], get_fpga_usec(state->fd_fparmsall[0], 0)));
#endif
pthread_yield();
pthread_mutex_lock(&params->writer_mutex);
/// params->circbuf_many = get_num_empty(params) >= NUM_NEED_NEMPTY;
if (get_num_empty(params) < NUM_FREE_MANY) {
params->circbuf_many = true;
}
#if NUM_WRITER_THREADS > 1
D3(dbg_start_wait = get_fpga_usec(state->fd_fparmsall[0], 0); fprintf(debug_file, \
"_13a_: waiting for empty buffer (%d): [%d%d%d%d%d%d%d%d] LOCK [%d%d:%d%d:%d%d] @ %07d\n",\
params->chunk_page_prep, \
......@@ -777,17 +784,29 @@ int camogm_frame_jpeg(camogm_state *state)
params->write_waits_sig[0], params->write_waits_sig[1], \
params->write_go[0], params->write_go[1], \
dbg_start_wait));
#else
D3(dbg_start_wait = get_fpga_usec(state->fd_fparmsall[0], 0); fprintf(debug_file, \
"_13a_: waiting for empty buffer (%d): [%d%d%d%d%d%d%d%d] LOCK [%d:%d:%d] @ %07d\n",\
params->chunk_page_prep, \
params->chunk_page_state[0],params->chunk_page_state[1],params->chunk_page_state[2],params->chunk_page_state[3], \
params->chunk_page_state[4],params->chunk_page_state[5],params->chunk_page_state[6],params->chunk_page_state[7], \
params->writev_run[0], \
params->write_waits_sig[0], \
params->write_go[0], \
dbg_start_wait));
#endif
// do not run ahead by >= FILE_CHUNKS_PAGES (0 or 1 for just FILE_CHUNKS_PAGES == 2)
// either buffer is completely empty (state->writer_params->chunk_page_prep == state->writer_params->chunk_page_write)
// or it is not full and writev_run is active
#if THREADS_MODE == 0
#if NUM_WRITER_THREADS > 1
#if THREADS_MODE == 0
while (((i = get_num_empty(params)) < NUM_NEED_EMPTY) && !params->exit_write_threads) { // get_num_empty(params)< 3?
D6(fprintf(debug_file, "_13b_: waiting for empty buffer (%d):[%d%d:%d%d] @ %07d\n",\
params->chunk_page_prep, params->writev_run[0], params->writev_run[1], \
params->write_waits_sig[0], params->write_waits_sig[1], get_fpga_usec(state->fd_fparmsall[0], 0)));
pthread_cond_wait(&params->main_cond, &params->writer_mutex);
} // pthread_cond_signal, pthread_cond_broadcast - signal or broadcast a condition
#elif THREADS_MODE > 0
#elif THREADS_MODE > 0
while ((((i = get_num_empty(params)) < NUM_NEED_EMPTY)
// || ()
) && !params->exit_write_threads) { // get_num_empty(params)< 3?
......@@ -796,9 +815,18 @@ int camogm_frame_jpeg(camogm_state *state)
params->write_waits_sig[0], params->write_waits_sig[1], get_fpga_usec(state->fd_fparmsall[0], 0)));
pthread_cond_wait(&params->main_cond, &params->writer_mutex);
} // pthread_cond_signal, pthread_cond_broadcast - signal or broadcast a condition
#endif
#else // single write thread
while (((i = get_num_empty(params)) < NUM_NEED_EMPTY) && !params->exit_write_threads) { // get_num_empty(params)< 3?
D6(fprintf(debug_file, "_13b_: waiting for empty buffer (%d):[%d:%d] @ %07d\n",\
params->chunk_page_prep, params->writev_run[0], \
params->write_waits_sig[0], get_fpga_usec(state->fd_fparmsall[0], 0)));
pthread_cond_wait(&params->main_cond, &params->writer_mutex);
} // pthread_cond_signal, pthread_cond_broadcast - signal or broadcast a condition
#endif
pthread_mutex_unlock(&params->writer_mutex);
#if NUM_WRITER_THREADS > 1
D3(dbg_wait=get_fpga_usec(state->fd_fparmsall[0], 0) - dbg_start_wait; if (dbg_wait <0) dbg_wait+=10000000;
fprintf(debug_file, "_13b0_: got empty buffer at %d (%d pages): [%d%d%d%d%d%d%d%d] [%d%d:%d%d:%d%d] @ %07d (+%07d)\n",\
params->chunk_page_prep, i, \
......@@ -808,6 +836,17 @@ int camogm_frame_jpeg(camogm_state *state)
params->write_waits_sig[0], params->write_waits_sig[1], \
params->write_go[0], params->write_go[1], \
get_fpga_usec(state->fd_fparmsall[0], 0), dbg_wait));
#else
D3(dbg_wait=get_fpga_usec(state->fd_fparmsall[0], 0) - dbg_start_wait; if (dbg_wait <0) dbg_wait+=10000000;
fprintf(debug_file, "_13b0_: got empty buffer at %d (%d pages): [%d%d%d%d%d%d%d%d] [%d:%d:%d] @ %07d (+%07d)\n",\
params->chunk_page_prep, i, \
params->chunk_page_state[0],params->chunk_page_state[1],params->chunk_page_state[2],params->chunk_page_state[3], \
params->chunk_page_state[4],params->chunk_page_state[5],params->chunk_page_state[6],params->chunk_page_state[7], \
params->writev_run[0], \
params->write_waits_sig[0], \
params->write_go[0], \
get_fpga_usec(state->fd_fparmsall[0], 0), dbg_wait));
#endif
if (params->exit_write_threads) { // at least one of the threads abnormally exited
D0(fprintf(debug_file, "_13b1_:exit_write_threads is true, abnormally exiting threads @ %07d\n",get_fpga_usec(state->fd_fparmsall[0], 0)));
......@@ -821,10 +860,15 @@ int camogm_frame_jpeg(camogm_state *state)
D0(fprintf(debug_file, "_13b3_:all writer threads exited @ %07d\n",get_fpga_usec(state->fd_fparmsall[0], 0)));
return -CAMOGM_FRAME_FILE_ERR;
}
#if NUM_WRITER_THREADS > 1
D6(fprintf(debug_file, "_13c_: got empty buffer (%d):[%d%d:%d%d] @ %07d\n",\
params->chunk_page_prep, params->writev_run[0], params->writev_run[1], \
params->write_waits_sig[0], params->write_waits_sig[1], get_fpga_usec(state->fd_fparmsall[0], 0)));
#else
D6(fprintf(debug_file, "_13c_: got empty buffer (%d):[%d:%d] @ %07d\n",\
params->chunk_page_prep, params->writev_run[0], \
params->write_waits_sig[0], get_fpga_usec(state->fd_fparmsall[0], 0)));
#endif
// params->dbg_data[params->chunk_page_prep][0] = port; // inside remap_vectors(state)
remap_vectors(state); // now can be used instead of align_frame(state), returns total length
// save segment file position, update next
......@@ -843,6 +887,8 @@ int camogm_frame_jpeg(camogm_state *state)
if (params->last_ret_val == 0) {
params->chunk_page_state[params->chunk_page_prep] = SEGPAGE_FULL; // mark as full;
params->chunk_page_prep = (params->chunk_page_prep + 1) % SEGMENTS_PAGES; // ***************** Here NEXT SEGMENT
#if NUM_WRITER_THREADS > 1
if (params->write_waits_sig[0] || params->write_waits_sig[1]) {
pthread_cond_broadcast(&params->writer_cond);
D3(fprintf(debug_file, "_13e_: chunk_page_prep = %d, signaled writer_cond * [%d%d:%d%d:%d%d] @ %07d\n", \
......@@ -857,6 +903,22 @@ int camogm_frame_jpeg(camogm_state *state)
params->write_go[0], params->write_go[1], \
get_fpga_usec(state->fd_fparmsall[0], 0)));
}
#else
if (params->write_waits_sig[0]) {
pthread_cond_broadcast(&params->writer_cond);
D3(fprintf(debug_file, "_13es_: chunk_page_prep = %d, signaled writer_cond * [%d:%d:%d] @ %07d\n", \
params->chunk_page_prep, params->writev_run[0], \
params->write_waits_sig[0], \
params->write_go[0], \
get_fpga_usec(state->fd_fparmsall[0], 0)));
} else {
D3(fprintf(debug_file, "_13e0s_: chunk_page_prep = %d, nobody listens, no signal -[%d:%d:%d] @ %07d\n", \
params->chunk_page_prep, params->writev_run[0], \
params->write_waits_sig[0], \
params->write_go[0], \
get_fpga_usec(state->fd_fparmsall[0], 0)));
}
#endif
}
pthread_mutex_unlock(&params->writer_mutex);
// add yield here?
......@@ -901,6 +963,7 @@ int camogm_end_jpeg(camogm_state *state)
D1(fprintf(debug_file, "Write last block of data, size = %d, full size = %d\n", bytes, ceil_size));
pthread_mutex_lock(&params->writer_mutex);
params->circbuf_many = true; // finish slowly?
#if NUM_WRITER_THREADS > 1
D3(fprintf(debug_file, \
"_x13a_: waiting for empty buffer (%d): [%d%d%d%d%d%d%d%d] LOCK [%d%d:%d%d:%d%d] @ %07d\n",\
params->chunk_page_prep, \
......@@ -921,15 +984,27 @@ int camogm_end_jpeg(camogm_state *state)
pthread_cond_wait(&params->main_cond, &params->writer_mutex);
}
#endif
#else
D3(fprintf(debug_file, \
"_x13a_: waiting for empty buffer (%d): [%d%d%d%d%d%d%d%d] LOCK [%d:%d:%d] @ %07d\n",\
params->chunk_page_prep, \
params->chunk_page_state[0],params->chunk_page_state[1],params->chunk_page_state[2],params->chunk_page_state[3], \
params->chunk_page_state[4],params->chunk_page_state[5],params->chunk_page_state[6],params->chunk_page_state[7], \
params->writev_run[0], \
params->write_waits_sig[0], \
params->write_go[0], \
get_fpga_usec(state->fd_fparmsall[0], 0)));
#endif
#if THREADS_MODE == 0
#if NUM_WRITER_THREADS > 1
#if THREADS_MODE == 0
while (((i = get_num_empty(params)) < NUM_NEED_EMPTY) && !params->exit_write_threads) { // get_num_empty(params)< 3?
D3(fprintf(debug_file, "_x13b_: waiting for empty buffer (%d):[%d%d:%d%d] @ %07d\n",\
params->chunk_page_prep, params->writev_run[0], params->writev_run[1], \
params->write_waits_sig[0], params->write_waits_sig[1], get_fpga_usec(state->fd_fparmsall[0], 0)));
pthread_cond_wait(&params->main_cond, &params->writer_mutex);
} // pthread_cond_signal, pthread_cond_broadcast - signal or broadcast a condition
#elif THREADS_MODE > 0
#elif THREADS_MODE > 0
while ((((i = get_num_empty(params)) < NUM_NEED_EMPTY)
) && !params->exit_write_threads) { // get_num_empty(params)< 3?
D3(fprintf(debug_file, "_x13b_: waiting for empty buffer (%d):[%d%d:%d%d] @ %07d\n",\
......@@ -937,6 +1012,14 @@ int camogm_end_jpeg(camogm_state *state)
params->write_waits_sig[0], params->write_waits_sig[1], get_fpga_usec(state->fd_fparmsall[0], 0)));
pthread_cond_wait(&params->main_cond, &params->writer_mutex);
} // pthread_cond_signal, pthread_cond_broadcast - signal or broadcast a condition
#endif
#else
while (((i = get_num_empty(params)) < NUM_NEED_EMPTY) && !params->exit_write_threads) { // get_num_empty(params)< 3?
D3(fprintf(debug_file, "_x13b_: waiting for empty buffer (%d):[%d:%d] @ %07d\n",\
params->chunk_page_prep, params->writev_run[0], \
params->write_waits_sig[0], get_fpga_usec(state->fd_fparmsall[0], 0)));
pthread_cond_wait(&params->main_cond, &params->writer_mutex);
} // pthread_cond_signal, pthread_cond_broadcast - signal or broadcast a condition
#endif
pthread_mutex_unlock(&params->writer_mutex);
// prepare last segment, increment segment page
......@@ -1072,21 +1155,29 @@ void *jpeg_writer(void *thread_args)
pthread_mutex_lock(&params->writer_mutex); // first time before the debug file is set !
params->write_go[this_thread_index] = false;
params->write_waits_sig[this_thread_index] = true;
#if NUM_WRITER_THREADS > 1
D3(dbg_start_wait = get_fpga_usec(state->fd_fparmsall[0], 0); \
fprintf(debug_file, "%s_w00_: (%d) thread ID=%ld got LOCK [%d%d] @ %07d\n", dbg_pref,\
params->chunk_page_prep, pthread_self(),
params->write_waits_sig[0], params->write_waits_sig[1], dbg_start_wait));
#else
D3(dbg_start_wait = get_fpga_usec(state->fd_fparmsall[0], 0); \
fprintf(debug_file, "%s_w00_: (%d) thread ID=%ld got LOCK [%d] @ %07d\n", dbg_pref,\
params->chunk_page_prep, pthread_self(),
params->write_waits_sig[0], dbg_start_wait));
#endif
// will not start if there are less than 3 (num threads+1) ready pages so the latest one that have DMA open
// will not be touched (its header can return zeros)
// finishing off the remaining pages (slow) is done when no other write threads may be still running DMA
#if THREADS_MODE == 0
#if NUM_WRITER_THREADS > 1
#if THREADS_MODE == 0
while ((((wpage = get_write_page(params)) < 0) ||
(params->circbuf_many ?
((i = get_num_empty(params)) >= (SEGMENTS_PAGES - NUM_WRITER_THREADS)) : // FULL+BYSY >= (threads+1)
((i = get_num_busy(params)) > 0) ) // one-by-one
|| (!params->write_waits_sig[1 - this_thread_index] && !params->write_go[this_thread_index]) // other write thread is not waiting, not already set go
) && !params->exit_write_threads) {
#if 1
#if 1
if (get_num_empty(params) >= (NUM_NEED_EMPTY + 2)){
pthread_cond_signal(&params->main_cond); // signal to main thread that prepares data
D3(fprintf(debug_file, "%s_w00.2_: signaling main while waiting to start write [%d%d:%d%d:%d%d] @ %07d\n", \
......@@ -1096,10 +1187,10 @@ void *jpeg_writer(void *thread_args)
params->write_go[0], params->write_go[1], \
get_fpga_usec(state->fd_fparmsall[0], 0)));
}
#endif
#endif
pthread_cond_wait(&params->writer_cond, &params->writer_mutex); // They are called with mutex locked by the calling thread or undefined behaviour will result.
}
#elif THREADS_MODE > 0
#elif THREADS_MODE > 0
while ((((wpage = get_write_page(params)) < 0) ||
(params->circbuf_many ?
((i = get_num_empty(params)) >= (SEGMENTS_PAGES - NUM_WRITER_THREADS)) : // FULL+BYSY >= (threads+1)
......@@ -1107,7 +1198,7 @@ void *jpeg_writer(void *thread_args)
// || (!params->write_waits_sig[1 - this_thread_index] && !params->write_go[this_thread_index]) // other write thread is not waiting, not already set go
) && !params->exit_write_threads) {
// Wake up main thread while waiting
#if 1
#if 1
if (get_num_empty(params) >= (NUM_NEED_EMPTY + 2)){
pthread_cond_signal(&params->main_cond); // signal to main thread that prepares data
D3(fprintf(debug_file, "%s_w00.2_: signaling main while waiting to start write [%d%d:%d%d:%d%d] @ %07d\n", \
......@@ -1117,11 +1208,42 @@ void *jpeg_writer(void *thread_args)
params->write_go[0], params->write_go[1], \
get_fpga_usec(state->fd_fparmsall[0], 0)));
}
#endif
#endif
pthread_cond_wait(&params->writer_cond, &params->writer_mutex); // They are called with mutex locked by the calling thread or undefined behaviour will result.
}
#endif
#else // here NUM_WRITER_THREADS == 1
while ((((wpage = get_write_page(params)) < 0) ||
(params->circbuf_many ?
((i = get_num_empty(params)) >= (SEGMENTS_PAGES - NUM_WRITER_THREADS)) : // FULL+BYSY >= (threads+1)
((i = get_num_busy(params)) > 0) ) // one-by-one
) && !params->exit_write_threads) {
#if 1
if (get_num_empty(params) >= (NUM_NEED_EMPTY + 2)){
pthread_cond_signal(&params->main_cond); // signal to main thread that prepares data
D3(fprintf(debug_file, "_w00.2_: signaling main while waiting to start write [%d:%d:%d] @ %07d\n", \
params->writev_run[0], \
params->write_waits_sig[0], \
params->write_go[0], \
get_fpga_usec(state->fd_fparmsall[0], 0)));
} else {
D3(fprintf(debug_file, "w00.2x_: wpage=%d, circbuf_many=%d get_num_empty()=%d, get_num_busy()=%d @ %07d\n", \
wpage,
params->circbuf_many,
get_num_empty(params),
get_num_busy(params),
get_fpga_usec(state->fd_fparmsall[0], 0)
));
}
#endif
pthread_cond_wait(&params->writer_cond, &params->writer_mutex); // They are called with mutex locked by the calling thread or undefined behaviour will result.
}
#endif
#if NUM_WRITER_THREADS > 1
D3(dbg_time= get_fpga_usec(state->fd_fparmsall[0], 0); dbg_wait= dbg_time - dbg_start_wait; if (dbg_wait <0) dbg_wait+=10000000; \
fprintf(debug_file, "%s_w00.3_: (%d, %d), empty/busy:%d many=%d thread ID=%ld got S+D [%d%d:%d%d:%d%d] @ %07d (+%07d)\n", dbg_pref,\
params->chunk_page_prep, wpage, i, params->circbuf_many, pthread_self(), \
......@@ -1129,16 +1251,28 @@ void *jpeg_writer(void *thread_args)
params->write_waits_sig[0], params->write_waits_sig[1], \
params->write_go[0], params->write_go[1], \
dbg_time, dbg_wait)); // dbg_time
#if THREADS_MODE == 0
#else
D3(dbg_time= get_fpga_usec(state->fd_fparmsall[0], 0); dbg_wait= dbg_time - dbg_start_wait; if (dbg_wait <0) dbg_wait+=10000000; \
fprintf(debug_file, "%s_w00.3_: (%d, %d), empty/busy:%d many=%d thread ID=%ld got S+D [%d:%d:%d] @ %07d (+%07d)\n", dbg_pref,\
params->chunk_page_prep, wpage, i, params->circbuf_many, pthread_self(), \
params->writev_run[0], \
params->write_waits_sig[0], \
params->write_go[0], \
dbg_time, dbg_wait)); // dbg_time
#endif
#if NUM_WRITER_THREADS > 1 // nothong to do for a single thread
#if THREADS_MODE == 0
if (params->write_waits_sig[1 - this_thread_index]) { // still waiting?
params->write_go[1 - this_thread_index] = true;
pthread_cond_broadcast(&params->writer_cond); // signal to other writer thread waiting for this one to finish writing
}
#elif THREADS_MODE > 0
#elif THREADS_MODE > 0
if (params->write_waits_sig[1 - this_thread_index]) { // still waiting?
params->write_go[1 - this_thread_index] = true;
pthread_cond_broadcast(&params->writer_cond); // signal to other writer thread waiting for this one to finish writing
}
#endif
#endif
params->write_go[this_thread_index] = false;
params->write_waits_sig[this_thread_index] = false;
......@@ -1150,15 +1284,22 @@ void *jpeg_writer(void *thread_args)
D3(fprintf(debug_file, "%s_w00.5_: (%d, %d) thread ID=%ld @ %07d\n", dbg_pref,\
params->chunk_page_prep, wpage, pthread_self(), get_fpga_usec(state->fd_fparmsall[0], 0)));
if (wpage >= 0) {
D6(fprintf(debug_file, "%s_w01_: got write segment page %d (preparing %d) @ %07d\n", dbg_pref,\
wpage, params->chunk_page_prep, get_fpga_usec(state->fd_fparmsall[0], 0)));
D3(fprintf(debug_file, "%s_w01_: got write segment page %d (preparing %d) [%d%d%d%d%d%d%d%d] @ %07d\n", dbg_pref,\
wpage, params->chunk_page_prep,
params->chunk_page_state[0],params->chunk_page_state[1],params->chunk_page_state[2],params->chunk_page_state[3], \
params->chunk_page_state[4],params->chunk_page_state[5],params->chunk_page_state[6],params->chunk_page_state[7], \
get_fpga_usec(state->fd_fparmsall[0], 0)));
image_len_padded = 0;
reset_rem = 0;
params->last_ret_val = 0;
// inputs from params->data_chunks, returns number of result chunks
#define MAX_SEGM_PAGES 0xa0 // for large JPEG - there is some me introduced bug in the kernel that limits chunk size. maybe 0xff - OK?
#define MAX_SEGM_LENGTH (MAX_SEGM_PAGES * PAGE_PHYS) // 0xa0000 // multiple of
// Seems that I introduced a bug into kernel - it can not handle too longsegments
// Seems that I introduced a bug into kernel - it can not handle too long segments
num_write_segments = 0;
for (i = 0; i < SEGMENTS_NUMBER; i++){
......@@ -1217,7 +1358,9 @@ void *jpeg_writer(void *thread_args)
if (write_segments[i].iov_len > 0) {
if (i == 1) { // only for segment 1, if other write thread also is writing second (large) segment
pthread_mutex_lock(&params->writer_mutex); // maybe it will not be needed - mutex
#if THREADS_MODE == 0
#if NUM_WRITER_THREADS > 1 // nothing to do here for a single thread
#if THREADS_MODE == 0
if (params->writev_run[1 - this_thread_index] &&
(params->writev_run_segm[1-this_thread_index] > 0) &&
(get_num_empty(params) >= NUM_NEED_EMPTY)){
......@@ -1226,7 +1369,7 @@ void *jpeg_writer(void *thread_args)
"%s_w02a_: starting second (large) write and signaling main <- @ %07d (+%d)\n", \
dbg_pref, dbg_us, ( dbg_us - dbg_us1)));
}
#elif THREADS_MODE == 1
#elif THREADS_MODE == 1
if (((params->writev_run[1 - this_thread_index] && // other is writing big chunk
(params->writev_run_segm[1-this_thread_index] > 0))
||(params->write_waits_sig[1 - this_thread_index]) // or it is waiting
......@@ -1237,8 +1380,16 @@ void *jpeg_writer(void *thread_args)
"%s_w02a_: starting second (large) write and signaling main <- @ %07d (+%d)\n", \
dbg_pref, dbg_us, ( dbg_us - dbg_us1)));
}
#elif THREADS_MODE > 0
#elif THREADS_MODE > 0
// do nothing. Or do if it really wants?
#endif
#else // single write thread
if (get_num_empty(params) >= NUM_NEED_EMPTY){
pthread_cond_signal(&params->main_cond); // signal to main thread that prepares data
D3(dbg_us=get_fpga_usec(state->fd_fparmsall[0], 0);fprintf(debug_file, \
"%s_w02a_: starting second (large) write and signaling main <- @ %07d (+%d)\n", \
dbg_pref, dbg_us, ( dbg_us - dbg_us1)));
}
#endif
pthread_mutex_unlock(&params->writer_mutex);
}
......@@ -1355,7 +1506,8 @@ void *jpeg_writer(void *thread_args)
// release main thread
reset_segments(state, reset_rem, wpage);
pthread_mutex_lock(&params->writer_mutex);
#if THREADS_MODE == 2
#if NUM_WRITER_THREADS > 1 // nothing to do here for a single thread
#if THREADS_MODE == 2
// signal main when one just ended all writes, and the other is either writing big or waiting
if (((params->writev_run[1 - this_thread_index] && (params->writev_run_segm[1-this_thread_index] > 0))// other is writing big chunk
......@@ -1367,13 +1519,15 @@ void *jpeg_writer(void *thread_args)
"%s_w031_: finished, other is writing second (large), signaling main <- @ %07d (+%d)\n", \
dbg_pref, dbg_us, ( dbg_us - dbg_us1)));
}
#endif
#endif
params->chunk_page_state[wpage] = SEGPAGE_EMPTY;
// update statistics right here !
state->rawdev.last_jpeg_size = image_len_padded;
state->rawdev.total_rec_len += state->rawdev.last_jpeg_size;
D4(fprintf(debug_file, "%sl=%d, total_rec_len=%lld\n",dbg_pref, image_len_padded, state->rawdev.total_rec_len));
D3(fprintf(debug_file, "%sl=%d, total_rec_len=%lld [%d%d%d%d%d%d%d%d]\n",dbg_pref, image_len_padded, state->rawdev.total_rec_len,
params->chunk_page_state[0],params->chunk_page_state[1],params->chunk_page_state[2],params->chunk_page_state[3], \
params->chunk_page_state[4],params->chunk_page_state[5],params->chunk_page_state[6],params->chunk_page_state[7]));
// D6(fprintf(debug_file, "%sCurrent position in block device: %lld\n", dbg_pref, lseek64(blockdev_fd, 0, SEEK_CUR)));
// moved to while waiting
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment