Commit d9fcca99 authored by Mikhail Karpenko's avatar Mikhail Karpenko

WIP: write JPEGs from separate thread in camogm

parent 4e936992
......@@ -31,7 +31,7 @@ IMAGEDIR = $(WWW_PAGES)/images
all: $(PROGS) $(TEST_PROG)
$(PROGS): $(OBJS)
$(CC) $(LDFLAGS) $^ $(LDLIBS) -o $@
$(CC) $(CFLAGS) $(LDFLAGS) $^ $(LDLIBS) -o $@
$(TEST_PROG): $(TEST_SRC:.c=.o)
......
......@@ -64,21 +64,6 @@
char trailer[TRAILER_SIZE] = { 0xff, 0xd9 };
#if 0
const char *exifFileNames[] = { "/dev/exif_exif0", "/dev/exif_exif1",
"/dev/exif_exif2", "/dev/exif_exif3"
};
const char *headFileNames[] = { "/dev/jpeghead0", "/dev/jpeghead1",
"/dev/jpeghead2", "/dev/jpeghead3"
};
const char *ctlFileNames[] = { "/dev/frameparsall0", "/dev/frameparsall1",
"/dev/frameparsall2", "/de/framepars3"
};
const char *circbufFileNames[] = {"/dev/circbuf0", "/dev/circbuf1",
"/dev/circbuf2", "/dev/circbuf3"
};
#else
const char *exifFileNames[] = { DEV393_PATH(DEV393_EXIF0), DEV393_PATH(DEV393_EXIF1),
DEV393_PATH(DEV393_EXIF2), DEV393_PATH(DEV393_EXIF3)
};
......@@ -92,7 +77,6 @@ const char *ctlFileNames[] = { DEV393_PATH(DEV393_FRAMEPARS0), DEV393_PATH(DEV3
const char *circbufFileNames[] = {DEV393_PATH(DEV393_CIRCBUF0), DEV393_PATH(DEV393_CIRCBUF1),
DEV393_PATH(DEV393_CIRCBUF2), DEV393_PATH(DEV393_CIRCBUF3)
};
#endif
int lastDaemonBit[SENSOR_PORTS] = {DAEMON_BIT_CAMOGM};
struct framepars_all_t *frameParsAll[SENSOR_PORTS];
......@@ -167,6 +151,7 @@ static int get_sysfs_name(const char *dev_name, char *sys_name, size_t str_sz, i
static int get_disk_range(const char *name, struct range *rng);
static int set_disk_range(const struct range *rng);
static void get_disk_info(camogm_state *state);
static struct timeval get_fpga_time(const int fd_fparsall, unsigned int port);
int open_files(camogm_state *state);
unsigned long getGPValue(unsigned int port, unsigned long GPNumber);
void setGValue(unsigned int port, unsigned long GNumber, unsigned long value);
......@@ -278,6 +263,10 @@ void camogm_init(camogm_state *state, char *pipe_name, uint16_t port_num)
state->active_chn = ALL_CHN_INACTIVE;
state->rawdev.mmap_default_size = MMAP_CHUNK_SIZE;
state->sock_port = port_num;
state->writer_params.data_ready = false;
state->writer_params.exit_thread = false;
state->writer_params.state = STATE_STOPPED;
}
/**
......@@ -365,7 +354,7 @@ int camogm_start(camogm_state *state)
if (is_chn_active(state, chn)) {
// Check/set circbuf read pointer
D3(fprintf(debug_file, "1: state->cirbuf_rp=0x%x\n", state->cirbuf_rp[chn]));
D3(fprintf(debug_file, "1a: compressed frame number = %i\n", lseek(state->fd_circ[chn], LSEEK_CIRC_GETFRAME, SEEK_END)));
D3(fprintf(debug_file, "1a: compressed frame number = %li\n", lseek(state->fd_circ[chn], LSEEK_CIRC_GETFRAME, SEEK_END)));
if ((state->cirbuf_rp[chn] < 0) || (lseek(state->fd_circ[chn], state->cirbuf_rp[chn], SEEK_SET) < 0) || (lseek(state->fd_circ[chn], LSEEK_CIRC_VALID, SEEK_END) < 0 )) {
D3(fprintf(debug_file, "2: state->cirbuf_rp=0x%x\n", state->cirbuf_rp[chn]));
/* In "greedy" mode try to save as many frames from the circbuf as possible */
......@@ -523,6 +512,10 @@ int sendImageFrame(camogm_state *state)
int * ifp_this = (int*)&(state->this_frame_params[state->port_num]);
int fp;
int port = state->port_num;
struct timeval start_time, end_time;
D6(fprintf(debug_file, "last_error_code = %d\n", state->last_error_code));
start_time = get_fpga_time(state->fd_fparmsall[port], port);
// This is probably needed only for Quicktime (not to exceed already allocated frame index)
if (!state->rawdev_op && (state->frameno >= (state->max_frames))) {
......@@ -693,6 +686,18 @@ int sendImageFrame(camogm_state *state)
state->frames_skip_left[port] += -(state->frames_skip);
}
D3(fprintf(debug_file,"cirbuf_rp to next frame = 0x%x\n", state->cirbuf_rp[port]));
end_time = get_fpga_time(state->fd_fparmsall[port], port);
unsigned int mbps; // write speed, MB/s
unsigned long long time_diff; // time elapsed, in microseconds
time_diff = ((end_time.tv_sec * 1000000 + end_time.tv_usec) - (start_time.tv_sec * 1000000 + start_time.tv_usec));
mbps = ((double)state->rawdev.last_jpeg_size / (double)1048576) / ((double)time_diff / (double)1000000);
D6(fprintf(debug_file, "Frame start time: %ld:%ld; frame end time: %ld:%ld; last frame size: %lu\n",
start_time.tv_sec, start_time.tv_usec,
end_time.tv_sec, end_time.tv_usec,
state->rawdev.last_jpeg_size));
D6(fprintf(debug_file, "Write speed: %d MB/s\n", mbps));
return 0;
}
......@@ -750,7 +755,7 @@ void camogm_free(camogm_state *state)
switch (f) {
case CAMOGM_FORMAT_NONE: break;
case CAMOGM_FORMAT_OGM: camogm_free_ogm(); break;
case CAMOGM_FORMAT_JPEG: camogm_free_jpeg(); break;
case CAMOGM_FORMAT_JPEG: camogm_free_jpeg(state); break;
case CAMOGM_FORMAT_MOV: camogm_free_mov(); break;
}
}
......@@ -1791,7 +1796,7 @@ unsigned int select_port(camogm_state *state)
free_sz = lseek(state->fd_circ[i], LSEEK_CIRC_FREE, SEEK_END);
lseek(state->fd_circ[i], file_pos, SEEK_SET);
if (state->prog_state == STATE_STARTING || state->prog_state == STATE_RUNNING)
D6(fprintf(debug_file, "port %i = %i, ", i, free_sz));
D6(fprintf(debug_file, "port %i = %li, ", i, free_sz));
if ((free_sz < min_sz && free_sz >= 0) || min_sz == -1) {
min_sz = free_sz;
chn = i;
......@@ -1978,7 +1983,7 @@ int main(int argc, char *argv[])
sstate.rawdev.thread_state = STATE_RUNNING;
str_len = strlen(state_name_str);
if (str_len > 0) {
strncpy(&sstate.rawdev.state_path, state_name_str, str_len + 1);
strncpy(sstate.rawdev.state_path, (const char *)state_name_str, str_len + 1);
}
ret = listener_loop(&sstate);
......@@ -2053,3 +2058,19 @@ inline int is_fd_valid(int fd)
{
return fcntl(fd, F_GETFD) != -1 || errno != EBADF;
}
/**
* @brief Get current FPGA time
* @return Time value in \e timeval structure
*/
struct timeval get_fpga_time(const int fd_fparsall, unsigned int port)
{
struct timeval tv;
unsigned long write_data[] = {FRAMEPARS_GETFPGATIME, 0};
write(fd_fparsall, write_data, sizeof(unsigned long) * 2);
tv.tv_sec = getGPValue(port, G_SECONDS);
tv.tv_usec = getGPValue(port, G_MICROSECONDS);
return tv;
}
......@@ -65,6 +65,8 @@
#define MMAP_CHUNK_SIZE 10485760
/** @brief Time interval (in microseconds) for processing commands */
#define COMMAND_LOOP_DELAY 500000
/** @brief File can be split up to this number of chunks */
#define FILE_CHUNKS_NUM 8
/**
* @enum state_flags
......@@ -124,6 +126,7 @@ typedef struct {
uint64_t mmap_offset;
uint64_t file_start;
int64_t total_rec_len;
unsigned long last_jpeg_size;
pthread_t tid;
volatile int thread_state;
unsigned char *disk_mmap;
......@@ -131,6 +134,23 @@ typedef struct {
char state_path[ELPHEL_PATH_MAX];
} rawdev_buffer;
/**
* @struct writer_params
* @brief Contains mutexes and conditional variables associated with disk writing thread
*/
struct writer_params {
int blockdev_fd; ///< file descriptor for open block device where frame will be recorded
pthread_t writer_thread; ///< disk writing thread
pthread_mutex_t writer_mutex; ///< synchronization mutex for main and writing threads
pthread_cond_t writer_cond; ///< conditional variable indicating that writer thread can proceed with new frame
pthread_cond_t main_cond; ///< conditional variable indicating that main thread can update write pointers
bool data_ready; ///< flag indicating that new frame is ready for recording, access to this flag
///< must be protected with #writer_mutex. Set this flag in main thread and reset in
///< disk writing thread.
int last_ret_val; ///< error value return during last frame recording (if any occurred)
bool exit_thread; ///< flag indicating that the writing thread should terminate
int state; ///< the state of disk writing thread
};
/**
* @struct camogm_state
* @brief Holds current state of the running program
......@@ -191,7 +211,7 @@ typedef struct {
int formats; ///< bitmask of used (initialized) formats
int format; ///< output file format
int set_format; ///< output format to set (will be updated after stop)
elph_packet_chunk packetchunks[8];
elph_packet_chunk packetchunks[FILE_CHUNKS_NUM];
int chunk_index;
int buf_overruns[SENSOR_PORTS];
int buf_min[SENSOR_PORTS];
......@@ -221,6 +241,7 @@ typedef struct {
rawdev_buffer rawdev; ///< contains pointers to raw device buffer
unsigned int active_chn; ///< bitmask of active sensor ports
uint16_t sock_port; ///< command socket port number
struct writer_params writer_params; ///< contains control parameters for writing thread
} camogm_state;
extern int debug_level;
......
......@@ -36,6 +36,9 @@
/** State file record format. It includes device path in /dev, starting, current and ending LBAs */
#define STATE_FILE_FORMAT "%s\t%llu\t%llu\t%llu\n"
/* forward declarations */
static void *jpeg_writer(void *thread_args);
/** Get starting and endign LBAs of the partition specified as raw device buffer */
static int get_disk_range(struct range *range)
{
......@@ -162,13 +165,62 @@ static int save_state_file(const rawdev_buffer *rawdev)
return ret;
}
/**
* @brief Initialize synchronization resources for disk writing thread and then start this thread. This function
* is call each time JPEG format is set or changed, thus we need to check the state of writing thread before
* initialization to prevent spawning multiple threads.
* @param[in] state a pointer to a structure containing current state
* @return 0 if initialization was successful and negative value otherwise
*/
int camogm_init_jpeg(camogm_state *state)
{
return 0;
int ret = 0;
int ret_val;
if (state->writer_params.state == STATE_STOPPED) {
ret_val = pthread_cond_init(&state->writer_params.main_cond, NULL);
if (ret_val != 0) {
D0(fprintf(debug_file, "Can not initialize conditional variable for main thread: %s\n", strerror(ret_val)));
ret = -1;
}
ret_val = pthread_cond_init(&state->writer_params.writer_cond, NULL);
if (ret_val != 0) {
D0(fprintf(debug_file, "Can not initialize conditional variable for writing thread: %s\n", strerror(ret_val)));
ret = -1;
}
ret_val = pthread_mutex_init(&state->writer_params.writer_mutex, NULL);
if (ret_val != 0) {
D0(fprintf(debug_file, "Can not initialize mutex for writing thread: %s\n", strerror(ret_val)));
ret = -1;
}
ret_val = pthread_create(&state->writer_params.writer_thread, NULL, jpeg_writer, (void *)state);
if (ret_val != 0) {
D0(fprintf(debug_file, "Can not start writer thread: %s\n", strerror(ret_val)));
ret = -1;
}
}
return ret;
}
void camogm_free_jpeg(void)
/**
* @brief Stop disk writing thread and free its resources. This function is called after the program receives 'exit' command.
* @param[in] state a pointer to a structure containing current state
* @return None
*/
void camogm_free_jpeg(camogm_state *state)
{
pthread_cond_destroy(&state->writer_params.main_cond);
pthread_cond_destroy(&state->writer_params.writer_cond);
pthread_mutex_destroy(&state->writer_params.writer_mutex);
// terminate writing thread
pthread_mutex_lock(&state->writer_params.writer_mutex);
state->writer_params.exit_thread = true;
pthread_cond_signal(&state->writer_params.writer_cond);
pthread_mutex_unlock(&state->writer_params.writer_mutex);
pthread_join(state->writer_params.writer_thread, NULL);
state->writer_params.exit_thread = false;
}
/** Calculate the total length of current frame */
......@@ -212,16 +264,16 @@ int camogm_start_jpeg(camogm_state *state)
}
}
} else {
if (open_state_file(&state->rawdev) != 0) {
D0(fprintf(debug_file, "Could not set write pointer via sysfs, recording will start from the beginning of partition: "
"%s\n", state->rawdev.rawdev_path));
}
state->rawdev.sysfs_fd = open(SYSFS_AHCI_WRITE, O_WRONLY);
D6(fprintf(debug_file, "Open sysfs file: %s\n", SYSFS_AHCI_WRITE));
if (state->rawdev.sysfs_fd < 0) {
D0(fprintf(debug_file, "Error opening sysfs file: %s\n", SYSFS_AHCI_WRITE));
// if (open_state_file(&state->rawdev) != 0) {
// D0(fprintf(debug_file, "Could not set write pointer via sysfs, recording will start from the beginning of partition: "
// "%s\n", state->rawdev.rawdev_path));
// }
state->writer_params.blockdev_fd = open(state->rawdev.rawdev_path, O_WRONLY);
if (state->writer_params.blockdev_fd < 0) {
D0(fprintf(debug_file, "Error opening block device: %s\n", state->rawdev.rawdev_path));
return -CAMOGM_FRAME_FILE_ERR;
}
D6(fprintf(debug_file, "Open block device: %s\n", state->rawdev.rawdev_path));
}
return 0;
......@@ -242,6 +294,7 @@ int camogm_frame_jpeg(camogm_state *state)
int port = state->port_num;
struct frame_data fdata = {0};
sprintf(state->path, "%s%d_%010ld_%06ld.jpeg", state->path_prefix, port, state->this_frame_params[port].timestamp_sec, state->this_frame_params[port].timestamp_usec);
if (!state->rawdev_op) {
l = 0;
for (i = 0; i < (state->chunk_index) - 1; i++) {
......@@ -249,7 +302,6 @@ int camogm_frame_jpeg(camogm_state *state)
chunks_iovec[i].iov_len = state->packetchunks[i + 1].bytes;
l += chunks_iovec[i].iov_len;
}
sprintf(state->path, "%s%d_%010ld_%06ld.jpeg", state->path_prefix, port, state->this_frame_params[port].timestamp_sec, state->this_frame_params[port].timestamp_usec);
if (((state->ivf = open(state->path, O_RDWR | O_CREAT, 0777))) < 0) {
D0(fprintf(debug_file, "Error opening %s for writing, returned %d, errno=%d\n", state->path, state->ivf, errno));
return -CAMOGM_FRAME_FILE_ERR;
......@@ -261,26 +313,31 @@ int camogm_frame_jpeg(camogm_state *state)
close(state->ivf);
return -CAMOGM_FRAME_FILE_ERR;
}
state->rawdev.last_jpeg_size = l;
close(state->ivf);
} else {
D6(fprintf(debug_file, "\ndump iovect array for port %u\n", state->port_num));
for (int i = 0; i < state->chunk_index - 1; i++) {
D6(fprintf(debug_file, "ptr: %p, length: %ld\n", state->packetchunks[i + 1].chunk, state->packetchunks[i + 1].bytes));
}
fdata.sensor_port = port;
fdata.cirbuf_ptr = state->cirbuf_rp[port];
fdata.jpeg_len = state->jpeg_len;
if (state->exif) {
fdata.meta_index = state->this_frame_params[port].meta_index;
fdata.cmd |= DRV_CMD_EXIF;
// next frame is ready for recording, signal this to the writer thread
pthread_mutex_lock(&state->writer_params.writer_mutex);
while (state->writer_params.data_ready)
pthread_cond_wait(&state->writer_params.main_cond, &state->writer_params.writer_mutex);
D6(fprintf(debug_file, "_13a_"));
// proceed if last frame was recorded without errors
if (state->writer_params.last_ret_val == 0) {
state->writer_params.data_ready = true;
pthread_cond_signal(&state->writer_params.writer_cond);
}
fdata.cmd |= DRV_CMD_WRITE;
if (write(state->rawdev.sysfs_fd, &fdata, sizeof(struct frame_data)) < 0) {
D0(fprintf(debug_file, "Can not pass IO vector to driver: %s\n", strerror(errno)));
return -CAMOGM_FRAME_FILE_ERR;
pthread_mutex_unlock(&state->writer_params.writer_mutex);
if (state->writer_params.last_ret_val != 0) {
return state->writer_params.last_ret_val;
}
// update statistics
state->rawdev.total_rec_len += camogm_get_jpeg_size(state);
state->rawdev.last_jpeg_size = camogm_get_jpeg_size(state);
state->rawdev.total_rec_len += state->rawdev.last_jpeg_size;
}
return 0;
......@@ -299,16 +356,64 @@ int camogm_end_jpeg(camogm_state *state)
struct frame_data fdata = {0};
if (state->rawdev_op) {
fdata.cmd = DRV_CMD_FINISH;
if (write(state->rawdev.sysfs_fd, &fdata, sizeof(struct frame_data)) < 0) {
D0(fprintf(debug_file, "Error sending 'finish' command to driver\n"));
}
D6(fprintf(debug_file, "Closing sysfs file %s\n", SYSFS_AHCI_WRITE));
ret = close(state->rawdev.sysfs_fd);
D6(fprintf(debug_file, "Closing block device %s\n", state->rawdev.rawdev_path));
ret = close(state->writer_params.blockdev_fd);
if (ret == -1)
D0(fprintf(debug_file, "Error: %s\n", strerror(errno)));
save_state_file(&state->rawdev);
// save_state_file(&state->rawdev);
}
return ret;
}
/**
* @brief Disk writing thread. This thread holds local copy of a structure containing current state
* of the program and updates it on a signal from main thread every time new frame is ready for recording.
* @param[in] thread_args a pointer to a structure containing current state
* @return None
*/
void *jpeg_writer(void *thread_args)
{
int rslt = 0;
int chunk_index;
ssize_t iovlen, l;
bool process = true;
struct iovec chunks_iovec[FILE_CHUNKS_NUM];
camogm_state *state = (camogm_state *)thread_args;
struct writer_params *params = &state->writer_params;
memset((void *)chunks_iovec, 0, sizeof(struct iovec) * FILE_CHUNKS_NUM);
pthread_mutex_lock(&params->writer_mutex);
params->state = STATE_RUNNING;
while (process) {
while (!params->data_ready && !params->exit_thread) {
pthread_cond_wait(&params->writer_cond, &params->writer_mutex);
}
if (params->exit_thread) {
process = false;
}
if (params->data_ready) {
l = 0;
state->writer_params.last_ret_val = 0;
for (int i = 0; i < (state->chunk_index) - 1; i++) {
chunks_iovec[i].iov_base = state->packetchunks[i + 1].chunk;
chunks_iovec[i].iov_len = state->packetchunks[i + 1].bytes;
l += chunks_iovec[i].iov_len;
}
chunk_index = state->chunk_index;
iovlen = writev(state->writer_params.blockdev_fd, chunks_iovec, chunk_index - 1);
if (iovlen < l) {
D0(fprintf(debug_file, "writev error: %s (returned %li, expected %li)\n", strerror(errno), iovlen, l));
state->writer_params.last_ret_val = -CAMOGM_FRAME_FILE_ERR;
}
// release main thread
params->data_ready = false;
pthread_cond_signal(&params->main_cond);
}
}
params->state = STATE_STOPPED;
pthread_mutex_unlock(&state->writer_params.writer_mutex);
return NULL;
}
......@@ -24,6 +24,6 @@ int camogm_init_jpeg(camogm_state *state);
int camogm_start_jpeg(camogm_state *state);
int camogm_frame_jpeg(camogm_state *state);
int camogm_end_jpeg(camogm_state *state);
void camogm_free_jpeg(void);
void camogm_free_jpeg(camogm_state *state);
#endif /* _CAMOGM_JPEG_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment