Commit ba98b0cf authored by Andrey Filippov's avatar Andrey Filippov

working version with 2 parallel write threads

parent bff5f99c
This diff is collapsed.
......@@ -18,7 +18,7 @@
#ifndef _CAMOGM_H
#define _CAMOGM_H
#define __USE_GNU // for O_DIRECT
//#define __USE_GNU // for O_DIRECT
//#define USE_POLL
#include <pthread.h>
#include <stdbool.h>
......@@ -30,6 +30,11 @@
#include <syslog.h>
#include <malloc.h> // debugging
#define NUM_WRITER_THREADS 2
#define NUM_NEED_EMPTY (NUM_WRITER_THREADS + 1) // run main thread to prepage more pages if >= empty
#define NUM_NEED_NEMPTY (NUM_WRITER_THREADS + 0) // wake up main thread if there are at least this number of empty pages
#define NUM_FREE_MANY (NUM_WRITER_THREADS + 1) // Set many if get_num_empty() < NUM_FREE_MANY
//#define NODEBUG
#define CAMOGM_FRAME_NOT_READY 1 ///< frame pointer valid, but not yet acquired
#define CAMOGM_FRAME_INVALID 2 ///< invalid frame pointer
......@@ -49,6 +54,8 @@
#define CAMOGM_FORMAT_MOV 3 ///< output as Apple Quicktime
#define CAMOGM_MIN_BUF_FRAMES 2 ///< buffer should accomodate at list this number of frames
#define FLUSH_DEBUG 1 ///< flush debug file after each print
#ifdef NODEBUG
#define D(x)
#define D0(x)
......@@ -60,7 +67,9 @@
#define D6(x)
#define D7(x)
#define DD(x)
#define DFLUSH
#else
# if FLUSH_DEBUG
#define D(x) { if (debug_file && debug_level) { x; fflush(debug_file); } }
#define D0(x) { if (debug_file) { pthread_mutex_lock(&print_mutex); x; fflush(debug_file); pthread_mutex_unlock(&print_mutex); } }
#define D1(x) { if (debug_file && (debug_level > 0)) { pthread_mutex_lock(&print_mutex); x; fflush(debug_file); pthread_mutex_unlock(&print_mutex); } }
......@@ -72,6 +81,21 @@
//#define D7(x) { if (debug_file && (debug_level > 6)) { pthread_mutex_lock(&print_mutex); x; fflush(debug_file); pthread_mutex_unlock(&print_mutex); } }
#define D7(x)
#define DD(x) { if (debug_file) { fprintf(debug_file, "%s:%d:", __FILE__, __LINE__); x; fflush(debug_file); } }
#else // if FLUSH_DEBUG
#define D(x) { if (debug_file && debug_level) { x; } }
#define D0(x) { if (debug_file) { pthread_mutex_lock(&print_mutex); x; pthread_mutex_unlock(&print_mutex); } }
#define D1(x) { if (debug_file && (debug_level > 0)) { pthread_mutex_lock(&print_mutex); x; pthread_mutex_unlock(&print_mutex); } }
#define D2(x) { if (debug_file && (debug_level > 1)) { pthread_mutex_lock(&print_mutex); x; pthread_mutex_unlock(&print_mutex); } }
#define D3(x) { if (debug_file && (debug_level > 2)) { pthread_mutex_lock(&print_mutex); x; pthread_mutex_unlock(&print_mutex); } }
#define D4(x) { if (debug_file && (debug_level > 3)) { pthread_mutex_lock(&print_mutex); x; pthread_mutex_unlock(&print_mutex); } }
#define D5(x) { if (debug_file && (debug_level > 4)) { pthread_mutex_lock(&print_mutex); x; pthread_mutex_unlock(&print_mutex); } }
#define D6(x) { if (debug_file && (debug_level > 5)) { pthread_mutex_lock(&print_mutex); x; pthread_mutex_unlock(&print_mutex); } }
//#define D7(x) { if (debug_file && (debug_level > 6)) { pthread_mutex_lock(&print_mutex); x; pthread_mutex_unlock(&print_mutex); } }
#define D7(x)
#define DD(x) { if (debug_file) { fprintf(debug_file, "%s:%d:", __FILE__, __LINE__); x; } }
#endif // if FLUSH_DEBUG else
#define DFLUSH { if (debug_file ) { pthread_mutex_lock(&print_mutex);fflush(debug_file); pthread_mutex_unlock(&print_mutex); } }
#endif
/** @brief HEADER_SIZE is defined to be larger than actual header (with EXIF) to use compile-time buffer */
......@@ -92,6 +116,8 @@
#define JPEG_TRAILER_LEN 2 ///< The size in bytes of JPEG trailer
#define CIRCBUF_ALIGNMENT_SIZE 32 ///< Align of CIRCBUF entries
#define MIN_USED_SIZE 4000000 // debugging DMA clashes
// Switching CHUNK -> SEGMENT
#define PAGE_PHYS 4096 // 512 // alignment size for O_DIRECT, may be different from LBA block size that is 512
enum segments {
......@@ -101,7 +127,13 @@ enum segments {
SEGMENTS_NUMBER ///< Just to get number of segment types
};
#define SEGMENTS_PAGES 4 // 2
enum segpage_states {
SEGPAGE_EMPTY = 0, ///< page of segments is empty (new or written to disk)
SEGPAGE_FULL, ///< page of segments is prepared for writing
SEGPAGE_BUSY ///< page of segments is in the process of being sent to fisk
};
#define SEGMENTS_PAGES 8 // 4 // 2
#define SEGMENTS_TOTAL ((SEGMENTS_NUMBER) * (SEGMENTS_PAGES))
/** Glue buffer should be large enough to contain:
......@@ -203,29 +235,35 @@ typedef struct {
* @brief Contains mutexes and conditional variables associated with disk writing thread
*/
struct writer_params {
int blockdev_fd; ///< file descriptor for open block device where frame will be recorded
pthread_t writer_thread; ///< disk writing thread
// int blockdev_fd; ///< file descriptor for open block device where frame will be recorded
pthread_t writer_threads [NUM_WRITER_THREADS]; ///< array of disk writer threads
bool writev_run [NUM_WRITER_THREADS]; ///< writev() is active (enable main thread if buffer not full and writew or buf empty)
bool write_waits_sig [NUM_WRITER_THREADS]; ///< This write thread is waiting for signal
bool write_go [NUM_WRITER_THREADS]; ///< This writer thread may go even if other thread already left write_waits_sig state
int writev_run_segm [NUM_WRITER_THREADS]; ///< current writev segment being written per thread (normally segment0 is small, other - big
pthread_mutex_t writer_mutex; ///< synchronization mutex for main and writing threads
pthread_cond_t writer_cond; ///< conditional variable indicating that writer thread can proceed with new frame
pthread_cond_t main_cond; ///< conditional variable indicating that main thread can update write pointers
bool circbuf_many; ///< probably there are many more frames in circbuf
int chunk_page_prep; ///< page of chunks being prepared. Incremented (mod) after data is prepared for raw write
int chunk_page_write; ///< page of chunks to write. Incremented (mod) after recording to disk
bool writev_run; ///< writev() is active (enable main thread if buffer not full and writew or buf empty)
int chunk_page_state[SEGMENTS_PAGES]; ///< segments page state: 0 - empty, 1 - full, 2 - currently writing to disk
uint64_t next_segment_pos; ///< next segment calculated file position (bytes)
uint64_t segment_pos [SEGMENTS_PAGES]; ///< per-segment file offsets
int last_ret_val; ///< error value return during last frame recording (if any occurred)
bool exit_thread; ///< flag indicating that the writing thread should terminate
bool exit_write_threads; ///< flag indicating that the writing threads should terminate and close their files
int state; ///< the state of disk writing thread
struct iovec *data_segments[SEGMENTS_PAGES]; ///< a set of vectors sets pointing to aligned frame data buffers
unsigned char *common_buffs[FILE_CHUNKS_PAGES]; ///< buffer for aligned JPEG header // make multiple?
int dbg_data [SEGMENTS_PAGES][2]; ///< debug data: channel and length of copied tail
unsigned char *glue_buffs[SEGMENTS_PAGES + 1]; ///< glue buffer (end of previous frame + start of this one
// unsigned char *glue_buffs_carry; ///< extra glue buffer to pass from the previous to the next frame
struct iovec glue_carry_vec; ///< current tail pointer of the carry glue segment
uint64_t lba_start; ///< disk starting LBA
uint64_t lba_current; ///< current write position in LBAs
uint64_t lba_end; ///< disk last LBA
time_t stat_update; ///< time when status file was updated
uint64_t stat_update; ///< time when status file was updated
bool dummy_read; ///< enable dummy read cycle (debug feature)
};
/**
* @struct camogm_state
* @brief Holds current state of the running program
......@@ -337,8 +375,8 @@ int waitDaemonEnabled(unsigned int port, int daemonBit);
int isDaemonEnabled(unsigned int port, int daemonBit);
int is_fd_valid(int fd);
int get_fpga_usec(const int fd_fparsall, unsigned int port);
inline void wait_frame_sync(const int fd_fparsall);
uint64_t get_fpga_time64(const int fd_fparsall, unsigned int port);
void wait_frame_sync(const int fd_fparsall);
unsigned long *get_ccam_dma_buf(int port);
#endif /* _CAMOGM_H */
This diff is collapsed.
......@@ -49,6 +49,7 @@ void deinit_align_buffers(camogm_state *state);
void reset_segments(camogm_state *state, int all, int page);
size_t remap_vectors(camogm_state *state); //, struct iovec *chunks);
uint64_t lba_to_offset(uint64_t lba);
uint64_t get_lba_next(const struct writer_params *params);
int vectaligntail(struct iovec *dest);
......
This diff is collapsed.
......@@ -17,7 +17,7 @@
#ifndef _CAMOGM_JPEG_H
#define _CAMOGM_JPEG_H
#define __USE_GNU // for O_DIRECT
//#define __USE_GNU // for O_DIRECT
#include "camogm.h"
......@@ -28,6 +28,10 @@ int camogm_end_jpeg(camogm_state *state);
void camogm_free_jpeg(camogm_state *state);
int open_state_file(const rawdev_buffer *rawdev, uint64_t *current_pos);
ssize_t emul_writev (int fd, const struct iovec *iovec, int count);
int get_write_page(struct writer_params *wparams);
int get_num_empty(struct writer_params *wparams);
int get_num_busy(struct writer_params *wparams);
#endif /* _CAMOGM_JPEG_H */
......@@ -1023,6 +1023,9 @@ void *reader(void *arg)
memset(&index_sparse, 0, sizeof(struct disk_idir));
prep_socket(&sockfd, state->sock_port);
D2(syslog (LOG_INFO, "%s:line %d : Started reader thread ID=%ld @ %07d", \
__FILE__, __LINE__, pthread_self(), get_fpga_usec(state->fd_fparmsall[0], 0)));
pthread_cleanup_push(exit_thread, &exit_state);
while (true) {
fd = accept(sockfd, NULL, 0);
......@@ -1041,8 +1044,13 @@ void *reader(void *arg)
cmd_ptr = cmd_buff;
trim_command(cmd_ptr, cmd_len);
while ((cmd = parse_command(&cmd_ptr)) != -2 && state->rawdev.thread_state != STATE_CANCEL) {
if (cmd >= 0)
D6(fprintf(debug_file, "Got command '%s', number %d\n", cmd_list[cmd], cmd));
D2(syslog (LOG_INFO, "%s:line %d : Got reader command '%s', number %d @ %07d", \
__FILE__, __LINE__, cmd_list[cmd], cmd, get_fpga_usec(state->fd_fparmsall[0], 0)));
D2(fprintf(debug_file, "%s:line %d : Got reader command '%s', number %d @ %07d\n", \
__FILE__, __LINE__, cmd_list[cmd], cmd, get_fpga_usec(state->fd_fparmsall[0], 0)));
// if (cmd >= 0) {
// D6(fprintf(debug_file, "Got command '%s', number %d\n", cmd_list[cmd], cmd));
// }
switch (cmd) {
case CMD_BUILD_INDEX:
// scan raw device buffer and create disk index directory
......@@ -1050,7 +1058,7 @@ void *reader(void *arg)
delete_idir(&index_dir);
}
build_index(state, &index_dir);
D3(fprintf(debug_file, "%d files read from %s\n", index_dir.size, state->rawdev.rawdev_path));
D2(fprintf(debug_file, "%d files read from %s\n", index_dir.size, state->rawdev.rawdev_path));
break;
case CMD_GET_INDEX:
// send the content of disk index directory over socket
......@@ -1073,6 +1081,7 @@ void *reader(void *arg)
// mmap raw device buffer in MMAP_CHUNK_SIZE chunks and send them over socket
mmap_range.from = rawdev->start_pos & PAGE_BOUNDARY_MASK;
mmap_range.to = mmap_range.from + rawdev->mmap_default_size;
D2(fprintf(debug_file, "CMD_READ_DISK from = %llu, to=%llu files read from %s\n", mmap_range.from, mmap_range.to, state->rawdev.rawdev_path));
disk_chunks = (size_t)ceil((double)(rawdev->end_pos - rawdev->start_pos) / (double)rawdev->mmap_default_size);
transfer = true;
mm_file_start = rawdev->start_pos;
......@@ -1105,6 +1114,7 @@ void *reader(void *arg)
}
break;
case CMD_READ_FILE:
D2(fprintf(debug_file, "CMD_READ_FILE from %s\n", state->rawdev.rawdev_path));
// read single file by offset given
if (index_dir.size > 0) {
struct disk_index indx;
......@@ -1116,6 +1126,7 @@ void *reader(void *arg)
break;
case CMD_FIND_FILE: {
// find file by time stamp
D2(fprintf(debug_file, "CMD_FIND_FILE from %s\n", state->rawdev.rawdev_path));
struct disk_index indx;
struct disk_index *indx_ptr = NULL;
if (get_timestamp_args(cmd_ptr, &indx) > 0) {
......@@ -1133,6 +1144,7 @@ void *reader(void *arg)
}
case CMD_NEXT_FILE: {
// read next file after previously found file
D2(fprintf(debug_file, "CMD_NEXT_FILE from %s\n", state->rawdev.rawdev_path));
struct range rng;
struct disk_index *new_indx = NULL;
struct disk_index *indx_ptr = NULL;
......@@ -1166,11 +1178,13 @@ void *reader(void *arg)
break;
}
case CMD_PREV_FILE: {
D2(fprintf(debug_file, "CMD_PREV_FILE from %s\n", state->rawdev.rawdev_path));
break;
}
case CMD_READ_ALL_FILES:
// read files from raw device buffer and send them over socket; the disk index directory
// should be built beforehand
D2(fprintf(debug_file, "CMD_READ_ALL_FILES from %s\n", state->rawdev.rawdev_path));
if (index_dir.size > 0) {
send_fnum(fd, index_dir.size);
close(fd);
......@@ -1235,7 +1249,10 @@ void *reader(void *arg)
usleep(COMMAND_LOOP_DELAY);
}
pthread_cleanup_pop(0);
D2(fprintf(debug_file, "%s:line %d : Exited reader thread ID=%ld @ %07d\n", \
__FILE__, __LINE__, pthread_self(), get_fpga_usec(state->fd_fparmsall[0], 0)));
D2(syslog (LOG_INFO, "%s:line %d : Exited reader thread ID=%ld @ %07d", \
__FILE__, __LINE__, pthread_self(), get_fpga_usec(state->fd_fparmsall[0], 0)));
return (void *) 0;
}
......@@ -1296,7 +1313,7 @@ static void build_index(camogm_state *state, struct disk_idir *idir)
uint64_t include_st_marker, include_en_marker;
size_t add_stm_len, add_enm_len;
struct disk_index *node = NULL;
D2(fprintf(debug_file, "%s:line %d: state->rawdev.rawdev_path=%s\n", __FILE__, __LINE__, state->rawdev.rawdev_path));
state->rawdev.rawdev_fd = open(state->rawdev.rawdev_path, O_RDONLY);
if (state->rawdev.rawdev_fd < 0) {
D0(perror(__func__));
......
......@@ -69,7 +69,7 @@
*/
define('SSD_ROOT', '/mnt/sda1/');
define('ASSUME_EXT4', 'sda1'); // comment out after testing
define('USE_SYSLOG', 1);
//define('USE_SYSLOG', 1);
$cmd = $_GET['cmd'];
$debug = $_GET['debug'];
$debuglev = $_GET['debuglev'];
......@@ -81,7 +81,8 @@ $default_state = "/etc/elphel393/camogm.disk";
//$state_file = "/mnt/sda1/camogm.disk";
$state_file = "/etc/elphel393/camogm.disk";
$start_str = "camogm -n " . $cmd_pipe . " -p " . $cmd_port;
if (defined('USE_SYSLOG')) {
//if (defined('USE_SYSLOG')) {
if (USE_SYSLOG > 0) {
syslog(LOG_NOTICE, "camogm_interface.php:");
foreach ($_GET as $k => $v){
syslog(LOG_NOTICE, $k ." -> ".$v);
......@@ -138,9 +139,9 @@ if ($cmd == "run_camogm")
}
// set fast recording mode if there is at least one suitable partition or revert to legacy 'mov' mode
if (defined('USE_SYSLOG')) syslog(LOG_NOTICE, "before get_raw_dev()");
if (USE_SYSLOG > 0) syslog(LOG_NOTICE, "before get_raw_dev()");
$partitions = get_raw_dev();
if (defined('USE_SYSLOG')) syslog(LOG_NOTICE, "after get_raw_dev()");
if (USE_SYSLOG > 0) syslog(LOG_NOTICE, "after get_raw_dev()");
if (!empty($partitions)) {
reset($partitions);
$cmd_str = 'format=jpeg;' . 'rawdev_path=' . key($partitions) . ';';
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment