Commit f0587888 authored by Mikhail Karpenko's avatar Mikhail Karpenko

Merge branch 'multithreaded_aligned'

parents 676ade6a 7e3873de
......@@ -11,7 +11,7 @@ IMAGES = $(GUIDIR)/images/filebrowser-01.gif $(GUIDIR)/images/filebrowser-bo
$(GUIDIR)/images/rec_folder.png $(GUIDIR)/images/up_folder.gif $(GUIDIR)/images/play_audio.png $(GUIDIR)/images/hdd.png
SRCS = camogm.c camogm_ogm.c camogm_jpeg.c camogm_mov.c camogm_kml.c camogm_read.c index_list.c
SRCS = camogm.c camogm_ogm.c camogm_jpeg.c camogm_mov.c camogm_kml.c camogm_read.c index_list.c camogm_align.c
TEST_SRC = camogm_test.c
OBJS = $(SRCS:.c=.o)
......@@ -31,7 +31,7 @@ IMAGEDIR = $(WWW_PAGES)/images
all: $(PROGS) $(TEST_PROG)
$(CC) $(LDFLAGS) $^ $(LDLIBS) -o $@
$(CC) $(CFLAGS) $(LDFLAGS) $^ $(LDLIBS) -o $@
$(TEST_PROG): $(TEST_SRC:.c=.o)
......@@ -64,21 +64,6 @@
char trailer[TRAILER_SIZE] = { 0xff, 0xd9 };
#if 0
const char *exifFileNames[] = { "/dev/exif_exif0", "/dev/exif_exif1",
"/dev/exif_exif2", "/dev/exif_exif3"
const char *headFileNames[] = { "/dev/jpeghead0", "/dev/jpeghead1",
"/dev/jpeghead2", "/dev/jpeghead3"
const char *ctlFileNames[] = { "/dev/frameparsall0", "/dev/frameparsall1",
"/dev/frameparsall2", "/de/framepars3"
const char *circbufFileNames[] = {"/dev/circbuf0", "/dev/circbuf1",
"/dev/circbuf2", "/dev/circbuf3"
const char *exifFileNames[] = { DEV393_PATH(DEV393_EXIF0), DEV393_PATH(DEV393_EXIF1),
......@@ -92,7 +77,6 @@ const char *ctlFileNames[] = { DEV393_PATH(DEV393_FRAMEPARS0), DEV393_PATH(DEV3
const char *circbufFileNames[] = {DEV393_PATH(DEV393_CIRCBUF0), DEV393_PATH(DEV393_CIRCBUF1),
struct framepars_all_t *frameParsAll[SENSOR_PORTS];
......@@ -167,6 +151,7 @@ static int get_sysfs_name(const char *dev_name, char *sys_name, size_t str_sz, i
static int get_disk_range(const char *name, struct range *rng);
static int set_disk_range(const struct range *rng);
static void get_disk_info(camogm_state *state);
static struct timeval get_fpga_time(const int fd_fparsall, unsigned int port);
int open_files(camogm_state *state);
unsigned long getGPValue(unsigned int port, unsigned long GPNumber);
void setGValue(unsigned int port, unsigned long GNumber, unsigned long value);
......@@ -279,6 +264,10 @@ void camogm_init(camogm_state *state, char *pipe_name, uint16_t port_num)
state->active_chn = ALL_CHN_INACTIVE;
state->rawdev.mmap_default_size = MMAP_CHUNK_SIZE;
state->sock_port = port_num;
state->writer_params.data_ready = false;
state->writer_params.exit_thread = false;
state->writer_params.state = STATE_STOPPED;
......@@ -366,6 +355,7 @@ int camogm_start(camogm_state *state)
if (is_chn_active(state, chn)) {
// Check/set circbuf read pointer
D3(fprintf(debug_file, "1: state->cirbuf_rp=0x%x\n", state->cirbuf_rp[chn]));
D3(fprintf(debug_file, "1a: compressed frame number = %li\n", lseek(state->fd_circ[chn], LSEEK_CIRC_GETFRAME, SEEK_END)));
if ((state->cirbuf_rp[chn] < 0) || (lseek(state->fd_circ[chn], state->cirbuf_rp[chn], SEEK_SET) < 0) || (lseek(state->fd_circ[chn], LSEEK_CIRC_VALID, SEEK_END) < 0 )) {
D3(fprintf(debug_file, "2: state->cirbuf_rp=0x%x\n", state->cirbuf_rp[chn]));
/* In "greedy" mode try to save as many frames from the circbuf as possible */
......@@ -526,6 +516,9 @@ int sendImageFrame(camogm_state *state)
int * ifp_this = (int*)&(state->this_frame_params[state->port_num]);
int fp;
int port = state->port_num;
struct timeval start_time, end_time;
// start_time = get_fpga_time(state->fd_fparmsall[port], port);
// This is probably needed only for Quicktime (not to exceed already allocated frame index)
if (!state->rawdev_op && (state->frameno >= (state->max_frames))) {
......@@ -656,13 +649,16 @@ int sendImageFrame(camogm_state *state)
/* copy from the beginning of the buffer to the end of the frame */
state->packetchunks[state->chunk_index ].bytes = state->jpeg_len - (state->circ_buff_size[port] - state->cirbuf_rp[port]);
state->packetchunks[state->chunk_index++].chunk = (unsigned char*)&ccam_dma_buf[state->port_num][0];
state->writer_params.segments = 2;
} else { // single segment
D3(fprintf(debug_file, "_11_"));
/* copy from the beginning of the frame to the end of the frame (no buffer rollovers) */
state->packetchunks[state->chunk_index ].bytes = state->jpeg_len;
state->packetchunks[state->chunk_index++].chunk = (unsigned char*)&ccam_dma_buf[state->port_num][state->cirbuf_rp[port] >> 2];
state->writer_params.segments = 1;
D3(fprintf(debug_file, "\tcirbuf_rp = 0x%x\t", state->cirbuf_rp[port]));
D3(fprintf(debug_file, "_12_"));
state->packetchunks[state->chunk_index ].bytes = 2;
state->packetchunks[state->chunk_index++].chunk = (unsigned char*)trailer;
......@@ -685,6 +681,7 @@ int sendImageFrame(camogm_state *state)
// advance frame pointer
state->cirbuf_rp[port] = lseek(state->fd_circ[port], LSEEK_CIRC_NEXT, SEEK_END);
D3(fprintf(debug_file, "\tcompressed frame number: %li\t", lseek(state->fd_circ[port], LSEEK_CIRC_GETFRAME, SEEK_END)));
// optionally save it to global read pointer (i.e. for debugging with imgsrv "/pointers")
if (state->save_gp) lseek(state->fd_circ[port], LSEEK_CIRC_SETP, SEEK_END);
D3(fprintf(debug_file, "_15_\n"));
......@@ -693,6 +690,19 @@ int sendImageFrame(camogm_state *state)
} else if (state->frames_skip < 0) {
state->frames_skip_left[port] += -(state->frames_skip);
D3(fprintf(debug_file,"cirbuf_rp to next frame = 0x%x\n", state->cirbuf_rp[port]));
// end_time = get_fpga_time(state->fd_fparmsall[port], port);
// unsigned int mbps; // write speed, MB/s
// unsigned long long time_diff; // time elapsed, in microseconds
// time_diff = ((end_time.tv_sec * 1000000 + end_time.tv_usec) - (start_time.tv_sec * 1000000 + start_time.tv_usec));
// mbps = ((double)state->rawdev.last_jpeg_size / (double)1048576) / ((double)time_diff / (double)1000000);
// D6(fprintf(debug_file, "Frame start time: %ld:%ld; frame end time: %ld:%ld; last frame size: %lu\n",
// start_time.tv_sec, start_time.tv_usec,
// end_time.tv_sec, end_time.tv_usec,
// state->rawdev.last_jpeg_size));
// D6(fprintf(debug_file, "Write speed: %d MB/s\n", mbps));
return 0;
......@@ -750,7 +760,7 @@ void camogm_free(camogm_state *state)
switch (f) {
case CAMOGM_FORMAT_OGM: camogm_free_ogm(); break;
case CAMOGM_FORMAT_JPEG: camogm_free_jpeg(); break;
case CAMOGM_FORMAT_JPEG: camogm_free_jpeg(state); break;
case CAMOGM_FORMAT_MOV: camogm_free_mov(); break;
......@@ -875,6 +885,9 @@ void get_disk_info(camogm_state *state)
if (get_disk_range(state->rawdev.rawdev_path, &rng) == 0) {
state->writer_params.lba_start = rng.from;
state->writer_params.lba_end =;
state->writer_params.lba_current = state->writer_params.lba_start;
} else {
D0(fprintf(debug_file, "ERROR: unable to get disk size and starting sector\n"));
......@@ -2029,7 +2042,7 @@ int main(int argc, char *argv[])
sstate.rawdev.thread_state = STATE_RUNNING;
str_len = strlen(state_name_str);
if (str_len > 0) {
strncpy(sstate.rawdev.state_path, state_name_str, str_len + 1);
strncpy(sstate.rawdev.state_path, (const char *)state_name_str, str_len + 1);
ret = listener_loop(&sstate);
......@@ -2104,3 +2117,19 @@ inline int is_fd_valid(int fd)
return fcntl(fd, F_GETFD) != -1 || errno != EBADF;
* @brief Get current FPGA time
* @return Time value in \e timeval structure
struct timeval get_fpga_time(const int fd_fparsall, unsigned int port)
struct timeval tv;
unsigned long write_data[] = {FRAMEPARS_GETFPGATIME, 0};
write(fd_fparsall, write_data, sizeof(unsigned long) * 2);
tv.tv_sec = getGPValue(port, G_SECONDS);
tv.tv_usec = getGPValue(port, G_MICROSECONDS);
return tv;
......@@ -27,7 +27,6 @@
#include <elphel/c313a.h>
#include <elphel/x393_devices.h>
#define CAMOGM_FRAME_NOT_READY 1 ///< frame pointer valid, but not yet acquired
#define CAMOGM_FRAME_INVALID 2 ///< invalid frame pointer
#define CAMOGM_FRAME_CHANGED 3 ///< frame parameters have changed
......@@ -66,6 +65,8 @@
#define MMAP_CHUNK_SIZE 10485760
/** @brief Time interval (in microseconds) for processing commands */
#define COMMAND_LOOP_DELAY 500000
/** @brief File can be split up to this number of chunks */
* @enum state_flags
......@@ -125,6 +126,7 @@ typedef struct {
uint64_t mmap_offset;
uint64_t file_start;
int64_t total_rec_len;
unsigned long last_jpeg_size;
pthread_t tid;
volatile int thread_state;
unsigned char *disk_mmap;
......@@ -132,6 +134,33 @@ typedef struct {
char state_path[ELPHEL_PATH_MAX];
} rawdev_buffer;
* @struct writer_params
* @brief Contains mutexes and conditional variables associated with disk writing thread
struct writer_params {
int blockdev_fd; ///< file descriptor for open block device where frame will be recorded
pthread_t writer_thread; ///< disk writing thread
pthread_mutex_t writer_mutex; ///< synchronization mutex for main and writing threads
pthread_cond_t writer_cond; ///< conditional variable indicating that writer thread can proceed with new frame
pthread_cond_t main_cond; ///< conditional variable indicating that main thread can update write pointers
bool data_ready; ///< flag indicating that new frame is ready for recording, access to this flag
///< must be protected with #writer_mutex. Set this flag in main thread and reset in
///< disk writing thread.
int last_ret_val; ///< error value return during last frame recording (if any occurred)
bool exit_thread; ///< flag indicating that the writing thread should terminate
int state; ///< the state of disk writing thread
int segments; ///< the number of segments in frame
struct iovec *data_chunks; ///< a set of vectors pointing to aligned frame data buffers
struct iovec prev_rem_vect; ///< vector pointing to the remainder of the previous frame
unsigned char *rem_buff; ///< buffer containing the unaligned remainder of the current frame
unsigned char *prev_rem_buff; ///< buffer containing the unaligned remainder of the previous frame
unsigned char *common_buff; ///< buffer for aligned JPEG header
uint64_t lba_start; ///< disk starting LBA
uint64_t lba_current; ///< current write position in LBAs
uint64_t lba_end; ///< disk last LBA
* @struct camogm_state
* @brief Holds current state of the running program
......@@ -192,7 +221,7 @@ typedef struct {
int formats; ///< bitmask of used (initialized) formats
int format; ///< output file format
int set_format; ///< output format to set (will be updated after stop)
elph_packet_chunk packetchunks[8];
elph_packet_chunk packetchunks[FILE_CHUNKS_NUM];
int chunk_index;
int buf_overruns[SENSOR_PORTS];
int buf_min[SENSOR_PORTS];
......@@ -222,7 +251,7 @@ typedef struct {
rawdev_buffer rawdev; ///< contains pointers to raw device buffer
unsigned int active_chn; ///< bitmask of active sensor ports
uint16_t sock_port; ///< command socket port number
struct writer_params writer_params; ///< contains control parameters for writing thread
unsigned int error_stat[SENSOR_PORTS][CAMOGM_ERRNUM]; ///< collect statistics about errors
} camogm_state;
This diff is collapsed.
/** @file camogm_align.h
* @brief Provides frame alignment functions use for recording to block device.
* @copyright Copyright (C) 2017 Elphel, Inc.
* @par <b>License</b>
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <>.
#include <unistd.h>
#include <sys/types.h>
#include "camogm.h"
#define PHY_BLOCK_SIZE 512 ///< Physical disk block size
#define JPEG_MARKER_LEN 2 ///< The size in bytes of JPEG marker
#define JPEG_SIZE_LEN 2 ///< The size in bytes of JPEG marker length field
#define INCLUDE_REM 1 ///< Include REM buffer to total size calculation
#define EXCLUDE_REM 0 ///< Exclude REM buffer from total size calculation
#define MAX_DATA_CHUNKS 9 ///< An array or JPEG frame chunks contains pointers to JPEG leading marker,
///< JPEG header, Exif data if present, stuffing bytes chunk which aligns
///< the frame size to disk sector boundary, JPEG data which
///< can be split into two chunks, align buffers, JPEG
///< trailing marker, and pointer to a buffer containing the remainder of a
///< frame. Nine chunks of data in total.
#define ALIGNMENT_SIZE 32 ///< Align buffers length to this amount of bytes
/** Common buffer should be large enough to contain JPEG header, Exif, some alignment bytes and remainder from previous frame */
///** This structure holds raw device buffer pointers */
//struct drv_pointers {
// uint64_t lba_start; ///< raw buffer starting LBA
// uint64_t lba_end; ///< raw buffer ending LBA
// uint64_t lba_write; ///< current write pointer inside raw buffer
// uint16_t wr_count; ///< the number of LBA to write next time
/** Container structure for frame buffers */
//struct frame_buffers {
// struct fvec exif_buff; ///< Exif buffer
// struct fvec jpheader_buff; ///< JPEG header buffer
// struct fvec trailer_buff; ///< buffer for trailing marker
// struct fvec common_buff; ///< common buffer where other parts are combined
// struct fvec rem_buff; ///< remainder from previous frame
/** Symbolic names for slots in buffer pointers. Buffer alignment function relies on the order of these names, so
* new names can be added but the overall order should not be changed */
enum {
CHUNK_LEADER, ///< pointer to JPEG leading marker
CHUNK_EXIF, ///< pointer to Exif buffer
CHUNK_HEADER, ///< pointer to JPEG header data excluding leading marker
CHUNK_COMMON, ///< pointer to common buffer
CHUNK_DATA_0, ///< pointer to JPEG data
CHUNK_DATA_1, ///< pointer to the second half of JPEG data if a frame crosses circbuf boundary
CHUNK_TRAILER, ///< pointer to JPEG trailing marker
CHUNK_ALIGN, ///< pointer to buffer where the second part of JPEG data should be aligned
CHUNK_REM ///< pointer to buffer containing the remainder of current frame. It will be recorded during next transaction
int init_align_buffers(camogm_state *state);
void deinit_align_buffers(camogm_state *state);
void align_frame(camogm_state *state);
void reset_chunks(struct iovec *vects, int all);
int update_lba(camogm_state *state);
int get_data_buffers(camogm_state *state, struct iovec *mapped, size_t all_sz);
int prep_last_block(camogm_state *state);
off64_t lba_to_offset(uint64_t lba);
#endif /* _CAMOGM_ALIGN_H */
This diff is collapsed.
......@@ -24,6 +24,6 @@ int camogm_init_jpeg(camogm_state *state);
int camogm_start_jpeg(camogm_state *state);
int camogm_frame_jpeg(camogm_state *state);
int camogm_end_jpeg(camogm_state *state);
void camogm_free_jpeg(void);
void camogm_free_jpeg(camogm_state *state);
#endif /* _CAMOGM_JPEG_H */
/** @brief This define is needed to use lseek64 and should be set before includes */
/** Needed for O_DIRECT */
#define _GNU_SOURCE
#include <stdbool.h>
#include <unistd.h>
......@@ -17,6 +19,7 @@
#include <ctype.h>
#include <signal.h>
#include <elphel/ahci_cmd.h>
#include <pthread.h>
#include "camogm.h"
#include "camogm_read.h"
......@@ -82,7 +85,16 @@ const char *circbufFileNames[] = {DEV393_PATH(DEV393_CIRCBUF0), DEV393_PATH(DEV3
unsigned long *ccam_dma_buf[SENSOR_PORTS];
int debug_level;
FILE* debug_file;
static volatile bool keep_running = true;
static volatile bool keep_running = true; // global flag for stopping threads
pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t writer_block = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t writer_cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t main_cond = PTHREAD_COND_INITIALIZER;
char block_dev[ELPHEL_PATH_MAX];
bool block_dev_op = true; // write data from writer thread instead of sending direct commands to driver
unsigned long *writer_data_ptr = NULL; // pointer to next data block in circbuf to be written from writer thread
int writer_data_len; // size of next data block to be written from writer thread
bool main_op = true; // allow main thread to proceed with data preparing
int open_files(camogm_state *state);
void clean_up(camogm_state *state);
......@@ -125,7 +137,7 @@ void camogm_init(camogm_state *state, char *pipe_name, uint16_t port_num)
memset(state, 0, sizeof(camogm_state));
camogm_reset(state); // sets state->buf_overruns =- 1
state->serialno = ipser[0];
debug_file = stderr;
debug_file = stdout;
strcpy(state->debug_name, "stderr");
state->exif = DEFAULT_EXIF;
......@@ -187,7 +199,7 @@ int open_files(camogm_state *state)
return -3;
} else {
D0(fprintf(debug_file, "successfully mmap cirbuf region\n"));
D0(fprintf(debug_file, "successfully mmap cirbuf region, ptr = %p\n", (void *)ccam_dma_buf[port]));
return ret;
......@@ -207,7 +219,20 @@ int sendImageFrame(camogm_state *state)
fdata.cmd |= DRV_CMD_EXIF;
fdata.cmd |= DRV_CMD_WRITE_TEST;
if (!block_dev_op) {
ret = write(state->rawdev.sysfs_fd, &fdata, sizeof(struct frame_data));
} else {
// write to block device, but not using direct driver commands
while (!main_op)
pthread_cond_wait(&main_cond, &writer_mutex);
D0(fprintf(debug_file, "Update data pointer\n"));
writer_data_ptr = ccam_dma_buf[port] + state->cirbuf_rp[port];
writer_data_len = state->jpeg_len;
main_op = false;
if (ret < 0) {
// D0(fprintf(debug_file, "Can not pass IO vector to driver (driver may be busy): %s\r", strerror(errno)));
ret = errno;
......@@ -668,7 +693,9 @@ int listener_loop(camogm_state *state, struct dd_params *dd_params)
seconds = difftime(now, dd_params->start_time);
if (seconds > 0)
wr_speed = mb_written / (unsigned long)seconds;
D0(fprintf(debug_file, "\r%lu MiB written, number of counts left: %04lu, average speed: %u MB/s",
// D0(fprintf(debug_file, "\r%lu MiB written, number of counts left: %04lu, average speed: %u MB/s",
// mb_written, dd_params->block_count, wr_speed));
D0(fprintf(debug_file, "%lu MiB written, number of counts left: %04lu, average speed: %u MB/s\n",
mb_written, dd_params->block_count, wr_speed));
read_stat(mb_written, state->path);
} else {
......@@ -749,6 +776,64 @@ static void start_test(camogm_state *state, const unsigned long max_cntr, const
// writer thread
void *writer(void *params)
int fd;
int ret = 0;
bool process = true;
unsigned char *tmp_buff;
size_t tmp_buff_sz = 1048576;
ret = posix_memalign((void **)&tmp_buff, sysconf(_SC_PAGE_SIZE), tmp_buff_sz);
if (ret == 0) {
printf("Aligned buffer allocated, ptr = %p\n", tmp_buff);
} else {
printf("Can not allocate aligned buffer\n");
return (void *)-1;
memset(tmp_buff, 1, tmp_buff_sz);
// open block device for writing
if ((fd = open(block_dev, O_WRONLY)) < 0) {
// if ((fd = open(block_dev, O_WRONLY | O_DIRECT)) < 0) {
// if ((fd = open("/mnt/sda1/thread_write.bin", O_CREAT | O_WRONLY)) < 0) {
// if ((fd = open("/mnt/sda1/thread_write.bin", O_CREAT | O_WRONLY | O_DIRECT)) < 0) {
printf("Can not open block device %s: %s\n", block_dev, strerror(errno));
return (void *)-1;
} else {
printf("Block device %s is opened from writer thread\n", block_dev);
// start recording
while (process && keep_running) {
while ((writer_data_ptr == NULL) && keep_running)
pthread_cond_wait(&writer_cond, &writer_mutex);
if (writer_data_ptr != NULL) {
printf("Emulate write, ptr = %p, size = %i\n", (void *)writer_data_ptr, writer_data_len);
// ret = write(fd, tmp_buff, tmp_buff_sz);
ret = write(fd, writer_data_ptr, writer_data_len);
if (ret < 0) {
printf("Error during recording: %s\n", strerror(errno));
process = false;
// exit from main thread also
keep_running = false;
printf("%i bytes written\n", ret);
writer_data_ptr = NULL; // wait while data will be ready
main_op = true;
// thread is terminating, clean up
printf("Exit from writer\n");
return NULL;
int main(int argc, char *argv[])
const char usage[] = "This program is intended for disk write tests\n" \
......@@ -780,6 +865,8 @@ int main(int argc, char *argv[])
struct dd_params dd_params = {0};
struct range range;
struct range disk_range;
pthread_t writer_thread;
pthread_attr_t attr;
if ((argc < 2) || (argv[1][1] == '-')) {
printf(usage, argv[0], argv[0]);
......@@ -790,6 +877,7 @@ int main(int argc, char *argv[])
case 'd':
// set path to disk under test
strncpy(disk_path, (const char *)optarg, ELPHEL_PATH_MAX - 1);
strncpy(block_dev, (const char *)optarg, ELPHEL_PATH_MAX - 1);
case 'n':
strncpy(pipe_name_str, (const char *)optarg, ELPHEL_PATH_MAX - 1);
......@@ -846,6 +934,14 @@ int main(int argc, char *argv[])
camogm_init(&sstate, pipe_name_str, port_num);
if (!test_mode) {
// spawn writer thread first
ret = pthread_create(&writer_thread, &attr, writer, NULL);
if (ret < 0) {
printf("Can not spawn new thread: %s\n", strerror(errno));
return ret;
ret = open_files(&sstate);
if (ret < 0)
return ret;
......@@ -871,6 +967,12 @@ int main(int argc, char *argv[])
printf("Exit from main\n");
keep_running = false;
pthread_join(writer_thread, NULL);
} else {
if (dd_params.block_size != 0) {
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment