Commit 4e936992 authored by Mikhail Karpenko's avatar Mikhail Karpenko

Write from separate thread in test program, test O_DIRECT flag

parent db8cd54c
......@@ -365,6 +365,7 @@ int camogm_start(camogm_state *state)
if (is_chn_active(state, chn)) {
// Check/set circbuf read pointer
D3(fprintf(debug_file, "1: state->cirbuf_rp=0x%x\n", state->cirbuf_rp[chn]));
D3(fprintf(debug_file, "1a: compressed frame number = %i\n", lseek(state->fd_circ[chn], LSEEK_CIRC_GETFRAME, SEEK_END)));
if ((state->cirbuf_rp[chn] < 0) || (lseek(state->fd_circ[chn], state->cirbuf_rp[chn], SEEK_SET) < 0) || (lseek(state->fd_circ[chn], LSEEK_CIRC_VALID, SEEK_END) < 0 )) {
D3(fprintf(debug_file, "2: state->cirbuf_rp=0x%x\n", state->cirbuf_rp[chn]));
/* In "greedy" mode try to save as many frames from the circbuf as possible */
......@@ -659,6 +660,7 @@ int sendImageFrame(camogm_state *state)
state->packetchunks[state->chunk_index ].bytes = state->jpeg_len;
state->packetchunks[state->chunk_index++].chunk = (unsigned char*)&ccam_dma_buf[state->port_num][state->cirbuf_rp[port] >> 2];
}
D3(fprintf(debug_file, "\tcirbuf_rp = 0x%x\t", state->cirbuf_rp[port]));
D3(fprintf(debug_file, "_12_"));
state->packetchunks[state->chunk_index ].bytes = 2;
state->packetchunks[state->chunk_index++].chunk = (unsigned char*)trailer;
......@@ -681,6 +683,7 @@ int sendImageFrame(camogm_state *state)
// advance frame pointer
state->frameno++;
state->cirbuf_rp[port] = lseek(state->fd_circ[port], LSEEK_CIRC_NEXT, SEEK_END);
D3(fprintf(debug_file, "\tcompressed frame number: %li\t", lseek(state->fd_circ[port], LSEEK_CIRC_GETFRAME, SEEK_END)));
// optionally save it to global read pointer (i.e. for debugging with imgsrv "/pointers")
if (state->save_gp) lseek(state->fd_circ[port], LSEEK_CIRC_SETP, SEEK_END);
D3(fprintf(debug_file, "_15_\n"));
......@@ -689,6 +692,7 @@ int sendImageFrame(camogm_state *state)
} else if (state->frames_skip < 0) {
state->frames_skip_left[port] += -(state->frames_skip);
}
D3(fprintf(debug_file,"cirbuf_rp to next frame = 0x%x\n", state->cirbuf_rp[port]));
return 0;
}
......
/** @brief This define is needed to use lseek64 and should be set before includes */
#define _LARGEFILE64_SOURCE
/** Needed for O_DIRECT */
#define _GNU_SOURCE
#include <stdbool.h>
#include <unistd.h>
......@@ -17,6 +19,7 @@
#include <ctype.h>
#include <signal.h>
#include <elphel/ahci_cmd.h>
#include <pthread.h>
#include "camogm.h"
#include "camogm_read.h"
......@@ -82,7 +85,16 @@ const char *circbufFileNames[] = {DEV393_PATH(DEV393_CIRCBUF0), DEV393_PATH(DEV3
unsigned long *ccam_dma_buf[SENSOR_PORTS];
int debug_level;
FILE* debug_file;
static volatile bool keep_running = true;
static volatile bool keep_running = true; // global flag for stopping threads
pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t writer_block = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t writer_cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t main_cond = PTHREAD_COND_INITIALIZER;
char block_dev[ELPHEL_PATH_MAX];
bool block_dev_op = true; // write data from writer thread instead of sending direct commands to driver
unsigned long *writer_data_ptr = NULL; // pointer to next data block in circbuf to be written from writer thread
int writer_data_len; // size of next data block to be written from writer thread
bool main_op = true; // allow main thread to proceed with data preparing
int open_files(camogm_state *state);
void clean_up(camogm_state *state);
......@@ -125,7 +137,7 @@ void camogm_init(camogm_state *state, char *pipe_name, uint16_t port_num)
memset(state, 0, sizeof(camogm_state));
camogm_reset(state); // sets state->buf_overruns =- 1
state->serialno = ipser[0];
debug_file = stderr;
debug_file = stdout;
camogm_debug_level(DEFAULT_DEBUG_LVL);
strcpy(state->debug_name, "stderr");
state->exif = DEFAULT_EXIF;
......@@ -187,7 +199,7 @@ int open_files(camogm_state *state)
clean_up(state);
return -3;
} else {
D0(fprintf(debug_file, "successfully mmap cirbuf region\n"));
D0(fprintf(debug_file, "successfully mmap cirbuf region, ptr = %p\n", (void *)ccam_dma_buf[port]));
}
return ret;
......@@ -207,7 +219,20 @@ int sendImageFrame(camogm_state *state)
fdata.cmd |= DRV_CMD_EXIF;
}
fdata.cmd |= DRV_CMD_WRITE_TEST;
if (!block_dev_op) {
ret = write(state->rawdev.sysfs_fd, &fdata, sizeof(struct frame_data));
} else {
// write to block device, but not using direct driver commands
pthread_mutex_lock(&writer_mutex);
while (!main_op)
pthread_cond_wait(&main_cond, &writer_mutex);
D0(fprintf(debug_file, "Update data pointer\n"));
writer_data_ptr = ccam_dma_buf[port] + state->cirbuf_rp[port];
writer_data_len = state->jpeg_len;
main_op = false;
pthread_cond_signal(&writer_cond);
pthread_mutex_unlock(&writer_mutex);
}
if (ret < 0) {
// D0(fprintf(debug_file, "Can not pass IO vector to driver (driver may be busy): %s\r", strerror(errno)));
ret = errno;
......@@ -668,7 +693,9 @@ int listener_loop(camogm_state *state, struct dd_params *dd_params)
seconds = difftime(now, dd_params->start_time);
if (seconds > 0)
wr_speed = mb_written / (unsigned long)seconds;
D0(fprintf(debug_file, "\r%lu MiB written, number of counts left: %04lu, average speed: %u MB/s",
// D0(fprintf(debug_file, "\r%lu MiB written, number of counts left: %04lu, average speed: %u MB/s",
// mb_written, dd_params->block_count, wr_speed));
D0(fprintf(debug_file, "%lu MiB written, number of counts left: %04lu, average speed: %u MB/s\n",
mb_written, dd_params->block_count, wr_speed));
read_stat(mb_written, state->path);
} else {
......@@ -749,6 +776,64 @@ static void start_test(camogm_state *state, const unsigned long max_cntr, const
close(fd);
}
// writer thread
void *writer(void *params)
{
int fd;
int ret = 0;
bool process = true;
unsigned char *tmp_buff;
size_t tmp_buff_sz = 1048576;
ret = posix_memalign((void **)&tmp_buff, sysconf(_SC_PAGE_SIZE), tmp_buff_sz);
if (ret == 0) {
printf("Aligned buffer allocated, ptr = %p\n", tmp_buff);
} else {
printf("Can not allocate aligned buffer\n");
return (void *)-1;
}
memset(tmp_buff, 1, tmp_buff_sz);
// open block device for writing
if ((fd = open(block_dev, O_WRONLY)) < 0) {
// if ((fd = open(block_dev, O_WRONLY | O_DIRECT)) < 0) {
// if ((fd = open("/mnt/sda1/thread_write.bin", O_CREAT | O_WRONLY)) < 0) {
// if ((fd = open("/mnt/sda1/thread_write.bin", O_CREAT | O_WRONLY | O_DIRECT)) < 0) {
printf("Can not open block device %s: %s\n", block_dev, strerror(errno));
return (void *)-1;
} else {
printf("Block device %s is opened from writer thread\n", block_dev);
}
// start recording
pthread_mutex_lock(&writer_mutex);
while (process && keep_running) {
while ((writer_data_ptr == NULL) && keep_running)
pthread_cond_wait(&writer_cond, &writer_mutex);
if (writer_data_ptr != NULL) {
printf("Emulate write, ptr = %p, size = %i\n", (void *)writer_data_ptr, writer_data_len);
// ret = write(fd, tmp_buff, tmp_buff_sz);
ret = write(fd, writer_data_ptr, writer_data_len);
if (ret < 0) {
printf("Error during recording: %s\n", strerror(errno));
process = false;
// exit from main thread also
keep_running = false;
}
printf("%i bytes written\n", ret);
writer_data_ptr = NULL; // wait while data will be ready
main_op = true;
pthread_cond_signal(&main_cond);
}
}
pthread_mutex_unlock(&writer_mutex);
// thread is terminating, clean up
close(fd);
printf("Exit from writer\n");
return NULL;
}
int main(int argc, char *argv[])
{
const char usage[] = "This program is intended for disk write tests\n" \
......@@ -780,6 +865,8 @@ int main(int argc, char *argv[])
struct dd_params dd_params = {0};
struct range range;
struct range disk_range;
pthread_t writer_thread;
pthread_attr_t attr;
if ((argc < 2) || (argv[1][1] == '-')) {
printf(usage, argv[0], argv[0]);
......@@ -790,6 +877,7 @@ int main(int argc, char *argv[])
case 'd':
// set path to disk under test
strncpy(disk_path, (const char *)optarg, ELPHEL_PATH_MAX - 1);
strncpy(block_dev, (const char *)optarg, ELPHEL_PATH_MAX - 1);
break;
case 'n':
strncpy(pipe_name_str, (const char *)optarg, ELPHEL_PATH_MAX - 1);
......@@ -846,6 +934,14 @@ int main(int argc, char *argv[])
camogm_init(&sstate, pipe_name_str, port_num);
if (!test_mode) {
// spawn writer thread first
pthread_attr_init(&attr);
ret = pthread_create(&writer_thread, &attr, writer, NULL);
if (ret < 0) {
printf("Can not spawn new thread: %s\n", strerror(errno));
return ret;
}
ret = open_files(&sstate);
if (ret < 0)
return ret;
......@@ -871,6 +967,12 @@ int main(int argc, char *argv[])
clean_up(&sstate);
ret = EXIT_FAILURE;
}
printf("Exit from main\n");
pthread_mutex_lock(&writer_mutex);
keep_running = false;
pthread_cond_signal(&writer_cond);
pthread_mutex_unlock(&writer_mutex);
pthread_join(writer_thread, NULL);
clean_up(&sstate);
} else {
if (dd_params.block_size != 0) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment