Commit 840f15d3 authored by Mikhail Karpenko's avatar Mikhail Karpenko

Add 'read_all_files' and 'read_disk' commands

parent 63bf56d0
......@@ -27,7 +27,7 @@ CFLAGS += -Wall -I$(ELPHEL_KERNEL_DIR)/include/elphel
all: $(PROGS)
$(PROGS): $(OBJS)
$(CC) $(LDFLAGS) $^ $(LDLIBS) -logg -pthread -o $@
$(CC) $(LDFLAGS) $^ $(LDLIBS) -logg -pthread -lm -o $@
install: $(PROGS)
$(INSTALL) -d $(INSTDIR)
$(INSTALL) -m $(INSTMODE) -o $(INSTOWNER) -g $(INSTGROUP) $(PROGS) $(INSTDIR)
......
......@@ -231,7 +231,7 @@ void camogm_init(camogm_state *state, char *pipe_name)
state->rawdev.curr_pos_w = state->rawdev.start_pos;
state->rawdev.curr_pos_r = state->rawdev.start_pos;
state->active_chn = ALL_CHN_ACTIVE;
state->rawdev.mmap_size = MMAP_CHUNK_SIZE;
state->rawdev.mmap_default_size = MMAP_CHUNK_SIZE;
}
/**
......@@ -803,7 +803,7 @@ void camogm_set_prefix(camogm_state *state, const char * p, path_type type)
D0(fprintf(debug_file, "WARNING: raw device write initiated\n"));
state->rawdev_op = 1;
/* debug code follows */
state->rawdev.end_pos = 10485760; // 10 Mib
state->rawdev.end_pos = 134217728;
/* end of debug code */
}
}
......
......@@ -60,7 +60,7 @@
#define RAWDEV_START_OFFSET 1024
/** @brief Maximum length of file or raw device path */
#define ELPHEL_PATH_MAX 300
#define MMAP_CHUNK_SIZE 134217728
#define MMAP_CHUNK_SIZE 10485760
/**
* @enum state_flags
......@@ -100,8 +100,10 @@ typedef struct {
uint64_t start_pos;
uint64_t end_pos;
uint64_t curr_pos_w;
uint64_t *disk_mmap;
uint64_t mmap_size;
unsigned char *disk_mmap;
uint64_t mmap_default_size;
uint64_t mmap_current_size;
uint64_t mmap_offset;
volatile uint64_t curr_pos_r;
uint64_t file_start;
pthread_t tid;
......
......@@ -18,6 +18,7 @@
/** @brief This define is needed to use lseek64 and should be set before includes */
#define _LARGEFILE64_SOURCE
#define _XOPEN_SOURCE
#define _XOPEN_SOURCE_EXTENDED
#include <stdio.h>
#include <stdlib.h>
......@@ -31,11 +32,12 @@
#include <errno.h>
#include <time.h>
#include <ctype.h>
#include <math.h>
#include <asm/byteorder.h>
#include <sys/statvfs.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/stat.h>
#include "camogm_read.h"
......@@ -47,6 +49,7 @@
#define CMD_DELIMITER "/?"
#define CMD_BUFF_LEN 1024
#define COMMAND_LOOP_DELAY 500000
#define PAGE_BOUNDARY_MASK 0xffffffffffffe000
/** @brief The size of read buffer in bytes. The data will be read from disk in blocks of this size */
#define PHY_BLK_SZ 4096
/** @brief Include or exclude file start and stop markers from resulting file. This must be set to 1 for JPEG files */
......@@ -149,15 +152,6 @@ enum search_state {
SEARCH_FILE_DATA
};
//enum sock_commands {
// CMD_BUILD_INDEX,
// CMD_GET_INDEX,
// CMD_READ_DISK,
// CMD_READ_FILE,
// CMD_READ_ALL_FILES,
// CMD_STATUS
//};
/**
* @brief Exif data format table.
*
......@@ -252,10 +246,14 @@ struct crb_ptrs {
struct exit_state {
camogm_state *state;
struct disk_idir *idir;
int ret_val;
int *sockfd_const;
int *sockfd_temp;
};
static inline void exit_thread(void *arg);
static void build_index(camogm_state *state, struct disk_idir *idir);
void dump_index_dir(const struct disk_idir *idir)
{
......@@ -726,43 +724,64 @@ static int check_edge_case(const struct iovec *from, const struct iovec *to, con
void send_buffer(int sockfd, unsigned char *buff, size_t sz)
{
size_t bytes_left = sz;
size_t bytes_written = 0;
ssize_t bytes_written = 0;
size_t offset = 0;
while (bytes_left > 0) {
bytes_written = write(sockfd, &buff[offset], bytes_left);
if (bytes_written < 0) {
perror(__func__);
return;
}
bytes_left -= bytes_written;
offset += bytes_written;
}
}
int mmap_disk(rawdev_buffer *rawdev)
int mmap_disk(rawdev_buffer *rawdev, const struct range *range)
{
int ret = 0;
size_t mmap_size = range->to - range->from;
rawdev->rawdev_fd = open(rawdev->rawdev_path, O_RDONLY);
if (rawdev->rawdev_fd < 0) {
return -1;
}
rawdev->disk_mmap = mmap(0, rawdev->mmap_size, PROT_READ, MAP_SHARED, rawdev->rawdev_fd, rawdev->start_pos);
rawdev->disk_mmap = mmap(0, mmap_size, PROT_READ, MAP_SHARED, rawdev->rawdev_fd, range->from);
if (rawdev->disk_mmap == MAP_FAILED) {
rawdev->disk_mmap = NULL;
close(rawdev->rawdev_fd);
return -1;
}
rawdev->mmap_offset = range->from;
rawdev->mmap_current_size = mmap_size;
return ret;
}
int unmmap_disk(rawdev_buffer *rawdev)
int munmap_disk(rawdev_buffer *rawdev)
{
int ret = 0;
if (rawdev->disk_mmap == NULL)
return 0;
if (munmap(rawdev->disk_mmap, rawdev->mmap_size) != 0)
if (munmap(rawdev->disk_mmap, rawdev->mmap_current_size) != 0)
return -1;
if (close(rawdev->rawdev_fd) != 0)
return -1;
rawdev->mmap_offset = 0;
rawdev->disk_mmap = NULL;
return ret;
return 0;
}
bool is_in_range(struct range *range, struct disk_index *indx)
{
if (indx->f_offset >= range->from &&
indx->f_offset <= range->to &&
(indx->f_offset + indx->f_size) <= range->to)
return true;
else
return false;
}
#define PORT_NUMBER 3456
......@@ -831,6 +850,34 @@ void trim_command(char *cmd, ssize_t cmd_len)
}
}
void send_split_file(rawdev_buffer *rawdev, struct disk_index *indx, int sockfd)
{
ssize_t rcntr = 0;
ssize_t scntr = 0;
size_t head_sz = rawdev->end_pos - indx->f_offset;
size_t tail_sz = indx->f_size - head_sz;
uint64_t curr_pos = lseek64(rawdev->rawdev_fd, 0, SEEK_CUR);
unsigned char *buff = malloc(indx->f_size);
if (buff == NULL)
return;
lseek64(rawdev->rawdev_fd, indx->f_offset, SEEK_SET);
while (rcntr < head_sz && rcntr != -1)
rcntr += read(rawdev->rawdev_fd, &buff[rcntr], head_sz - rcntr);
rcntr = 0;
lseek64(rawdev->rawdev_fd, rawdev->start_pos, SEEK_SET);
while (rcntr < tail_sz && rcntr != -1)
rcntr += read(rawdev->rawdev_fd, &buff[head_sz + rcntr], tail_sz - rcntr);
while (scntr < indx->f_size && scntr != -1)
scntr += write(sockfd, &buff[scntr], head_sz - scntr);
fprintf(debug_file, "%s: %d bytes sent\n", __func__, scntr);
lseek64(rawdev->rawdev_fd, curr_pos, SEEK_SET);
}
/**
*
* @param arg
......@@ -839,19 +886,35 @@ void trim_command(char *cmd, ssize_t cmd_len)
void *reader(void *arg)
{
int sockfd, fd;
int disk_chunks;
int cmd;
char cmd_buff[CMD_BUFF_LEN] = {0};
char *cmd_ptr;
bool transfer;
ssize_t cmd_len;
size_t mm_file_start, mm_file_size;
size_t file_cntr;
camogm_state *state = (camogm_state *)arg;
rawdev_buffer *rawdev = &state->rawdev;
unsigned char *disk_buff = NULL;
struct stat stat_buff;
struct range mmap_range;
struct disk_index *disk_indx, *cross_boundary_indx;
struct disk_idir index_dir = {
.head = NULL,
.tail = NULL,
.size = 0
};
struct exit_state exit_state = {
.state = state,
.idir = &index_dir,
.sockfd_const = &sockfd,
.sockfd_temp = &fd,
.ret_val = 0
};
prep_socket(&sockfd);
pthread_cleanup_push(exit_thread, &exit_state);
while (true) {
fd = accept(sockfd, NULL, 0);
if (fd == -1)
......@@ -861,18 +924,104 @@ void *reader(void *arg)
trim_command(cmd_ptr, cmd_len);
while ((cmd = parse_command(&cmd_ptr)) != -2) {
if (cmd >= 0)
D6(fprintf(debug_file, "Got command '%s'\n", cmd_list[cmd]));
D6(fprintf(debug_file, "Got command '%s', number %d\n", cmd_list[cmd], cmd));
switch (cmd) {
case CMD_BUILD_INDEX:
if (index_dir.size != 0) {
fprintf(debug_file, "%s: removing disk index directory", __func__);
delete_idir(&index_dir);
fprintf(debug_file, "... done\n");
}
build_index(state, &index_dir);
D3(fprintf(debug_file, "%d files read from %s\n", index_dir.size, state->rawdev.rawdev_path));
break;
case CMD_GET_INDEX:
dump_index_dir(&index_dir);
break;
case CMD_READ_DISK:
mmap_range.from = rawdev->start_pos & PAGE_BOUNDARY_MASK;
mmap_range.to = rawdev->start_pos + rawdev->mmap_default_size;
disk_chunks = (size_t)ceil((double)(rawdev->end_pos - rawdev->start_pos) / (double)rawdev->mmap_default_size);
transfer = true;
mm_file_start = rawdev->start_pos;
mm_file_size = rawdev->mmap_default_size;
fprintf(debug_file, "Retrieving %d chunks from disk\n", disk_chunks);
close(fd);
while (disk_chunks > 0 && transfer) {
fprintf(debug_file, "Waiting for connection\n");
fd = accept(sockfd, NULL, 0);
if (mmap_disk(rawdev, &mmap_range) == 0) {
send_buffer(fd, &rawdev->disk_mmap[mm_file_start], mm_file_size);
} else {
transfer = false;
D0(fprintf(debug_file, "Unable to map disk to memory region:"
"disk region start = 0x%llx, disk region end = 0x%llx\n", mmap_range.from, mmap_range.to));
}
if (munmap_disk(rawdev) != 0) {
transfer = false;
D0(fprintf(debug_file, "Unable to unmap memory region\n"));
}
fprintf(debug_file, "%d disk chunks left to send\n", disk_chunks);
mm_file_start = 0;
disk_chunks--;
mmap_range.from = mmap_range.to + 1;
mmap_range.to = mmap_range.from + rawdev->mmap_default_size;
if (mmap_range.to > rawdev->end_pos)
mmap_range.to = rawdev->end_pos;
close(fd);
}
break;
case CMD_READ_FILE:
break;
case CMD_READ_ALL_FILES:
if (index_dir.size > 0) {
mmap_range.from = rawdev->start_pos;
mmap_range.to = rawdev->start_pos + rawdev->mmap_default_size;
disk_indx = index_dir.head;
cross_boundary_indx = NULL;
file_cntr = 0;
transfer = true;
close(fd);
while (file_cntr < index_dir.size && disk_indx != NULL) {
if (is_in_range(&mmap_range, disk_indx) && rawdev->disk_mmap != NULL) {
fd = accept(sockfd, NULL, 0);
mm_file_start = disk_indx->f_offset - rawdev->mmap_offset;
send_buffer(fd, &rawdev->disk_mmap[mm_file_start], disk_indx->f_size);
close(fd);
disk_indx = disk_indx->next;
file_cntr++;
} else {
if (munmap_disk(rawdev) == 0) {
mmap_range.from = disk_indx->f_offset & PAGE_BOUNDARY_MASK;
mmap_range.to = mmap_range.from + rawdev->mmap_default_size;
if (mmap_range.to > rawdev->end_pos) {
mmap_range.to = rawdev->end_pos;
}
if (disk_indx->f_offset + disk_indx->f_size <= mmap_range.to) {
if (mmap_disk(rawdev, &mmap_range) < 0) {
disk_indx = NULL;
D0(fprintf(debug_file, "Unable to map disk to memory region:"
" disk region start = 0x%llx, disk region end = 0x%llx\n", mmap_range.from, mmap_range.to));
}
} else {
cross_boundary_indx = disk_indx;
disk_indx = NULL;
}
} else {
disk_indx = NULL;
D0(fprintf(debug_file, "Unable to unmap memory region\n"));
}
}
}
munmap_disk(rawdev);
if (cross_boundary_indx != NULL) {
send_split_file(rawdev, cross_boundary_indx, fd);
close(fd);
}
} else {
D0(fprintf(debug_file, "Index directory does not contain any files. Try to rebuild index "
"directory with 'build_index' command\n"));
}
break;
case CMD_STATUS:
break;
......@@ -880,10 +1029,11 @@ void *reader(void *arg)
D0(fprintf(debug_file, "Unrecognized command is skipped\n"));
}
}
fprintf(debug_file, "Closing connection\n");
close(fd);
if (fstat(fd, &stat_buff) != EBADF)
close(fd);
usleep(COMMAND_LOOP_DELAY);
}
pthread_cleanup_pop(0);
return (void *) 0;
}
......@@ -891,11 +1041,20 @@ void *reader(void *arg)
static inline void exit_thread(void *arg)
{
struct exit_state *s = (struct exit_state *)arg;
struct stat buff;
if (s->state->rawdev.rawdev_fd > 0) {
if (fstat(s->state->rawdev.rawdev_fd, &buff) != EBADF) {
if (s->state->rawdev.disk_mmap != NULL)
munmap(s->state->rawdev.disk_mmap, s->state->rawdev.mmap_current_size);
close(s->state->rawdev.rawdev_fd);
s->state->rawdev.rawdev_fd = -1;
}
if (s->idir->size != 0)
delete_idir(s->idir);
if (fstat(*s->sockfd_const, &buff) != EBADF)
close(*s->sockfd_const);
if (fstat(*s->sockfd_temp, &buff) != EBADF)
close(*s->sockfd_temp);
s->state->rawdev.thread_finished = true;
}
......@@ -910,7 +1069,7 @@ static inline void exit_thread(void *arg)
* calls. The effect of use of normal @b return or @b break to prematurely leave this loop is undefined.
* @todo update description, reorder decision tree
*/
void *build_index(void *arg)
void build_index(camogm_state *state, struct disk_idir *idir)
{
const int include_markers = INCLUDE_MARKERS;
int process;
......@@ -928,20 +1087,13 @@ void *build_index(void *arg)
unsigned char *save_to = NULL;
uint64_t dev_curr_pos = 0;
uint64_t include_st_marker, include_en_marker;
struct disk_idir index_dir = {0};
size_t add_stm_len, add_enm_len;
camogm_state *state = arg;
struct exit_state exit_state = {
.state = state,
.ret_val = 0
};
state->rawdev.rawdev_fd = open(state->rawdev.rawdev_path, O_RDONLY);
if (state->rawdev.rawdev_fd < 0) {
D0(perror(__func__));
D0(fprintf(debug_file, "Error opening raw device %s\n", state->rawdev.rawdev_path));
exit_state.ret_val = -CAMOGM_FRAME_FILE_ERR;
exit_thread(&exit_state);
return;
}
if (include_markers) {
......@@ -960,7 +1112,6 @@ void *build_index(void *arg)
zero_cross = 0;
search_state = SEARCH_SKIP;
idir_result = 0;
pthread_cleanup_push(exit_thread, &exit_state);
while (process) {
rd = read(state->rawdev.rawdev_fd, buff, sizeof(buff));
err = errno;
......@@ -998,22 +1149,22 @@ void *build_index(void *arg)
if (pos_start == MATCH_NOT_FOUND && pos_stop == MATCH_NOT_FOUND) {
// normal condition, search in progress
buff_processed = 1;
D6(fprintf(debug_file, "State 'skip data'\n"));
// D6(fprintf(debug_file, "State 'skip data'\n"));
} else if (pos_start >= 0 && pos_stop == MATCH_NOT_FOUND && search_state == SEARCH_SKIP) {
// normal condition, new file found
search_state = SEARCH_FILE_DATA;
state->rawdev.file_start = dev_curr_pos + pos_start + (save_from - active_buff);
idir_result = save_index(state, &index_dir);
idir_result = save_index(state, idir);
buff_processed = 1;
D6(fprintf(debug_file, "New file found. File start position: %llu\n", state->rawdev.file_start));
D6(fprintf(debug_file, "State 'starting file'\n"));
} else if (pos_start >= 0 && pos_stop == MATCH_NOT_FOUND && search_state == SEARCH_FILE_DATA) {
// error condition (normally should not happen), discard current index and start a new one
buff_processed = 1;
remove_node(&index_dir, index_dir.tail);
remove_node(idir, idir->tail);
if (zero_cross == 0) {
state->rawdev.file_start = dev_curr_pos + pos_start + (save_from - active_buff);
idir_result = save_index(state, &index_dir);
idir_result = save_index(state, idir);
} else {
process = 0;
}
......@@ -1023,7 +1174,7 @@ void *build_index(void *arg)
// normal condition, save current file size to index directory
uint64_t disk_pos = dev_curr_pos + pos_stop + (save_from - active_buff);
search_state = SEARCH_SKIP;
idir_result = stop_index(&index_dir, disk_pos);
idir_result = stop_index(idir, disk_pos);
buff_processed = 1;
if (zero_cross)
process = 0;
......@@ -1035,10 +1186,10 @@ void *build_index(void *arg)
} else if (pos_start >= 0 && pos_stop >= 0 && pos_start > pos_stop && search_state == SEARCH_FILE_DATA) {
// normal condition, start marker following stop marker found - this indicates a new file
uint64_t disk_pos = dev_curr_pos + pos_stop + (save_from - active_buff);
idir_result = stop_index(&index_dir, disk_pos);
idir_result = stop_index(idir, disk_pos);
if (zero_cross == 0) {
state->rawdev.file_start = dev_curr_pos + pos_start + (save_from - active_buff);
idir_result = save_index(state, &index_dir);
idir_result = save_index(state, idir);
save_from = save_from + pos_start + add_stm_len;
// @todo: replace with pointer to current buffer
save_to = buff + rd;
......@@ -1064,7 +1215,7 @@ void *build_index(void *arg)
if (result == MATCH_FOUND) {
search_state = SEARCH_FILE_DATA;
state->rawdev.file_start = dev_curr_pos + pos_start + (save_from - active_buff);
idir_result = save_index(state, &index_dir);
idir_result = save_index(state, idir);
D6(fprintf(debug_file, "File start position: %llu\n", state->rawdev.file_start));
save_from = next_chunk.iov_base + field_markers.second_buff.iov_len;
save_to = next_chunk.iov_base + next_chunk.iov_len;
......@@ -1092,7 +1243,7 @@ void *build_index(void *arg)
if (result == MATCH_FOUND) {
search_state = SEARCH_SKIP;
uint64_t disk_pos = dev_curr_pos + pos_stop + (save_from - active_buff);
idir_result = stop_index(&index_dir, disk_pos);
idir_result = stop_index(idir, disk_pos);
save_from = next_chunk.iov_base + field_markers.second_buff.iov_len;
save_to = next_chunk.iov_base + next_chunk.iov_len;
} else {
......@@ -1118,20 +1269,9 @@ void *build_index(void *arg)
state->rawdev.curr_pos_r = dev_curr_pos;
}
}
pthread_cleanup_pop(0);
D0(fprintf(debug_file, "\n%d files read from %s\n", index_dir.size, state->rawdev.rawdev_path));
if (close(state->rawdev.rawdev_fd) != 0) {
perror("Unable to close raw device: ");
exit_state.ret_val = -CAMOGM_FRAME_FILE_ERR;
exit_thread(&exit_state);
}
state->rawdev.rawdev_fd = -1;
dump_index_dir(&index_dir);
delete_idir(&index_dir);
exit_state.ret_val = 0;
exit_thread(&exit_state);
}
......@@ -35,7 +35,12 @@ struct disk_idir {
size_t size;
};
void *build_index(void *arg);
struct range {
uint64_t from;
uint64_t to;
};
//void *build_index(void *arg);
void *reader(void *arg);
#endif /* _CAMOGM_READ_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment