diff --git a/src/camogm.c b/src/camogm.c index ee5d65453e140564106518120eb753b870d2d51f..f69d45a4d9238433d09fe258d8ceb73850bcecab 100644 --- a/src/camogm.c +++ b/src/camogm.c @@ -365,6 +365,7 @@ int camogm_start(camogm_state *state) if (is_chn_active(state, chn)) { // Check/set circbuf read pointer D3(fprintf(debug_file, "1: state->cirbuf_rp=0x%x\n", state->cirbuf_rp[chn])); + D3(fprintf(debug_file, "1a: compressed frame number = %i\n", lseek(state->fd_circ[chn], LSEEK_CIRC_GETFRAME, SEEK_END))); if ((state->cirbuf_rp[chn] < 0) || (lseek(state->fd_circ[chn], state->cirbuf_rp[chn], SEEK_SET) < 0) || (lseek(state->fd_circ[chn], LSEEK_CIRC_VALID, SEEK_END) < 0 )) { D3(fprintf(debug_file, "2: state->cirbuf_rp=0x%x\n", state->cirbuf_rp[chn])); /* In "greedy" mode try to save as many frames from the circbuf as possible */ @@ -659,6 +660,7 @@ int sendImageFrame(camogm_state *state) state->packetchunks[state->chunk_index ].bytes = state->jpeg_len; state->packetchunks[state->chunk_index++].chunk = (unsigned char*)&ccam_dma_buf[state->port_num][state->cirbuf_rp[port] >> 2]; } + D3(fprintf(debug_file, "\tcirbuf_rp = 0x%x\t", state->cirbuf_rp[port])); D3(fprintf(debug_file, "_12_")); state->packetchunks[state->chunk_index ].bytes = 2; state->packetchunks[state->chunk_index++].chunk = (unsigned char*)trailer; @@ -681,6 +683,7 @@ int sendImageFrame(camogm_state *state) // advance frame pointer state->frameno++; state->cirbuf_rp[port] = lseek(state->fd_circ[port], LSEEK_CIRC_NEXT, SEEK_END); + D3(fprintf(debug_file, "\tcompressed frame number: %li\t", lseek(state->fd_circ[port], LSEEK_CIRC_GETFRAME, SEEK_END))); // optionally save it to global read pointer (i.e. for debugging with imgsrv "/pointers") if (state->save_gp) lseek(state->fd_circ[port], LSEEK_CIRC_SETP, SEEK_END); D3(fprintf(debug_file, "_15_\n")); @@ -689,6 +692,7 @@ int sendImageFrame(camogm_state *state) } else if (state->frames_skip < 0) { state->frames_skip_left[port] += -(state->frames_skip); } + D3(fprintf(debug_file,"cirbuf_rp to next frame = 0x%x\n", state->cirbuf_rp[port])); return 0; } diff --git a/src/camogm_test.c b/src/camogm_test.c index a2de5c05fb21e89e74ee4715dbf1f380c3cb3fb6..a78d8268d869994bd94a71f9a20fc334523ae4b5 100644 --- a/src/camogm_test.c +++ b/src/camogm_test.c @@ -1,5 +1,7 @@ /** @brief This define is needed to use lseek64 and should be set before includes */ #define _LARGEFILE64_SOURCE +/** Needed for O_DIRECT */ +#define _GNU_SOURCE #include #include @@ -17,6 +19,7 @@ #include #include #include +#include #include "camogm.h" #include "camogm_read.h" @@ -82,7 +85,16 @@ const char *circbufFileNames[] = {DEV393_PATH(DEV393_CIRCBUF0), DEV393_PATH(DEV3 unsigned long *ccam_dma_buf[SENSOR_PORTS]; int debug_level; FILE* debug_file; -static volatile bool keep_running = true; +static volatile bool keep_running = true; // global flag for stopping threads +pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t writer_block = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t writer_cond = PTHREAD_COND_INITIALIZER; +pthread_cond_t main_cond = PTHREAD_COND_INITIALIZER; +char block_dev[ELPHEL_PATH_MAX]; +bool block_dev_op = true; // write data from writer thread instead of sending direct commands to driver +unsigned long *writer_data_ptr = NULL; // pointer to next data block in circbuf to be written from writer thread +int writer_data_len; // size of next data block to be written from writer thread +bool main_op = true; // allow main thread to proceed with data preparing int open_files(camogm_state *state); void clean_up(camogm_state *state); @@ -125,7 +137,7 @@ void camogm_init(camogm_state *state, char *pipe_name, uint16_t port_num) memset(state, 0, sizeof(camogm_state)); camogm_reset(state); // sets state->buf_overruns =- 1 state->serialno = ipser[0]; - debug_file = stderr; + debug_file = stdout; camogm_debug_level(DEFAULT_DEBUG_LVL); strcpy(state->debug_name, "stderr"); state->exif = DEFAULT_EXIF; @@ -187,7 +199,7 @@ int open_files(camogm_state *state) clean_up(state); return -3; } else { - D0(fprintf(debug_file, "successfully mmap cirbuf region\n")); + D0(fprintf(debug_file, "successfully mmap cirbuf region, ptr = %p\n", (void *)ccam_dma_buf[port])); } return ret; @@ -207,7 +219,20 @@ int sendImageFrame(camogm_state *state) fdata.cmd |= DRV_CMD_EXIF; } fdata.cmd |= DRV_CMD_WRITE_TEST; - ret = write(state->rawdev.sysfs_fd, &fdata, sizeof(struct frame_data)); + if (!block_dev_op) { + ret = write(state->rawdev.sysfs_fd, &fdata, sizeof(struct frame_data)); + } else { + // write to block device, but not using direct driver commands + pthread_mutex_lock(&writer_mutex); + while (!main_op) + pthread_cond_wait(&main_cond, &writer_mutex); + D0(fprintf(debug_file, "Update data pointer\n")); + writer_data_ptr = ccam_dma_buf[port] + state->cirbuf_rp[port]; + writer_data_len = state->jpeg_len; + main_op = false; + pthread_cond_signal(&writer_cond); + pthread_mutex_unlock(&writer_mutex); + } if (ret < 0) { // D0(fprintf(debug_file, "Can not pass IO vector to driver (driver may be busy): %s\r", strerror(errno))); ret = errno; @@ -668,7 +693,9 @@ int listener_loop(camogm_state *state, struct dd_params *dd_params) seconds = difftime(now, dd_params->start_time); if (seconds > 0) wr_speed = mb_written / (unsigned long)seconds; - D0(fprintf(debug_file, "\r%lu MiB written, number of counts left: %04lu, average speed: %u MB/s", +// D0(fprintf(debug_file, "\r%lu MiB written, number of counts left: %04lu, average speed: %u MB/s", +// mb_written, dd_params->block_count, wr_speed)); + D0(fprintf(debug_file, "%lu MiB written, number of counts left: %04lu, average speed: %u MB/s\n", mb_written, dd_params->block_count, wr_speed)); read_stat(mb_written, state->path); } else { @@ -749,6 +776,64 @@ static void start_test(camogm_state *state, const unsigned long max_cntr, const close(fd); } +// writer thread +void *writer(void *params) +{ + int fd; + int ret = 0; + bool process = true; + unsigned char *tmp_buff; + size_t tmp_buff_sz = 1048576; + + ret = posix_memalign((void **)&tmp_buff, sysconf(_SC_PAGE_SIZE), tmp_buff_sz); + if (ret == 0) { + printf("Aligned buffer allocated, ptr = %p\n", tmp_buff); + } else { + printf("Can not allocate aligned buffer\n"); + return (void *)-1; + } + memset(tmp_buff, 1, tmp_buff_sz); + + // open block device for writing + if ((fd = open(block_dev, O_WRONLY)) < 0) { +// if ((fd = open(block_dev, O_WRONLY | O_DIRECT)) < 0) { +// if ((fd = open("/mnt/sda1/thread_write.bin", O_CREAT | O_WRONLY)) < 0) { +// if ((fd = open("/mnt/sda1/thread_write.bin", O_CREAT | O_WRONLY | O_DIRECT)) < 0) { + printf("Can not open block device %s: %s\n", block_dev, strerror(errno)); + return (void *)-1; + } else { + printf("Block device %s is opened from writer thread\n", block_dev); + } + + // start recording + pthread_mutex_lock(&writer_mutex); + while (process && keep_running) { + while ((writer_data_ptr == NULL) && keep_running) + pthread_cond_wait(&writer_cond, &writer_mutex); + if (writer_data_ptr != NULL) { + printf("Emulate write, ptr = %p, size = %i\n", (void *)writer_data_ptr, writer_data_len); +// ret = write(fd, tmp_buff, tmp_buff_sz); + ret = write(fd, writer_data_ptr, writer_data_len); + if (ret < 0) { + printf("Error during recording: %s\n", strerror(errno)); + process = false; + // exit from main thread also + keep_running = false; + } + printf("%i bytes written\n", ret); + writer_data_ptr = NULL; // wait while data will be ready + main_op = true; + pthread_cond_signal(&main_cond); + } + } + pthread_mutex_unlock(&writer_mutex); + // thread is terminating, clean up + close(fd); + printf("Exit from writer\n"); + + return NULL; +} + int main(int argc, char *argv[]) { const char usage[] = "This program is intended for disk write tests\n" \ @@ -780,6 +865,8 @@ int main(int argc, char *argv[]) struct dd_params dd_params = {0}; struct range range; struct range disk_range; + pthread_t writer_thread; + pthread_attr_t attr; if ((argc < 2) || (argv[1][1] == '-')) { printf(usage, argv[0], argv[0]); @@ -790,6 +877,7 @@ int main(int argc, char *argv[]) case 'd': // set path to disk under test strncpy(disk_path, (const char *)optarg, ELPHEL_PATH_MAX - 1); + strncpy(block_dev, (const char *)optarg, ELPHEL_PATH_MAX - 1); break; case 'n': strncpy(pipe_name_str, (const char *)optarg, ELPHEL_PATH_MAX - 1); @@ -846,6 +934,14 @@ int main(int argc, char *argv[]) camogm_init(&sstate, pipe_name_str, port_num); if (!test_mode) { + // spawn writer thread first + pthread_attr_init(&attr); + ret = pthread_create(&writer_thread, &attr, writer, NULL); + if (ret < 0) { + printf("Can not spawn new thread: %s\n", strerror(errno)); + return ret; + } + ret = open_files(&sstate); if (ret < 0) return ret; @@ -871,6 +967,12 @@ int main(int argc, char *argv[]) clean_up(&sstate); ret = EXIT_FAILURE; } + printf("Exit from main\n"); + pthread_mutex_lock(&writer_mutex); + keep_running = false; + pthread_cond_signal(&writer_cond); + pthread_mutex_unlock(&writer_mutex); + pthread_join(writer_thread, NULL); clean_up(&sstate); } else { if (dd_params.block_size != 0) {