Commit 1ab4818a authored by Mikhail Karpenko's avatar Mikhail Karpenko

Start reading thread right after program start

Program state can be changed from both threads now and
'state->prog_state' modification is protected with mutex
parent eb13c40a
...@@ -215,6 +215,7 @@ void camogm_init(camogm_state *state, char *pipe_name) ...@@ -215,6 +215,7 @@ void camogm_init(camogm_state *state, char *pipe_name)
state->exif = DEFAULT_EXIF; state->exif = DEFAULT_EXIF;
state->frame_lengths = NULL; state->frame_lengths = NULL;
// reading thread has not been started yet, mutex lock is not necessary
state->prog_state = STATE_STOPPED; state->prog_state = STATE_STOPPED;
// kml stuff // kml stuff
...@@ -312,7 +313,9 @@ int camogm_start(camogm_state *state) ...@@ -312,7 +313,9 @@ int camogm_start(camogm_state *state)
} }
state->max_frames = state->set_max_frames; state->max_frames = state->set_max_frames;
state->frames_per_chunk = state->set_frames_per_chunk; state->frames_per_chunk = state->set_frames_per_chunk;
pthread_mutex_lock(&state->mutex);
state->prog_state = STATE_STARTING; state->prog_state = STATE_STARTING;
pthread_mutex_unlock(&state->mutex);
FOR_EACH_PORT(int, chn) { FOR_EACH_PORT(int, chn) {
//! Check/set circbuf read pointer //! Check/set circbuf read pointer
D3(fprintf(debug_file, "1: state->cirbuf_rp=0x%x\n", state->cirbuf_rp[chn])); D3(fprintf(debug_file, "1: state->cirbuf_rp=0x%x\n", state->cirbuf_rp[chn]));
...@@ -453,7 +456,9 @@ int camogm_start(camogm_state *state) ...@@ -453,7 +456,9 @@ int camogm_start(camogm_state *state)
} }
if (state->kml_enable) rslt = camogm_start_kml(state); // will turn on state->kml_used if it can if (state->kml_enable) rslt = camogm_start_kml(state); // will turn on state->kml_used if it can
if (rslt) return rslt; if (rslt) return rslt;
pthread_mutex_lock(&state->mutex);
state->prog_state = STATE_RUNNING; state->prog_state = STATE_RUNNING;
pthread_mutex_unlock(&state->mutex);
D1(fprintf(debug_file, "Started OK\n")); D1(fprintf(debug_file, "Started OK\n"));
return 0; return 0;
} }
...@@ -656,7 +661,9 @@ int camogm_stop(camogm_state *state) ...@@ -656,7 +661,9 @@ int camogm_stop(camogm_state *state)
if (state->prog_state != STATE_STARTING) { if (state->prog_state != STATE_STARTING) {
D2(fprintf(debug_file, "Recording was not running, nothing to stop\n")); D2(fprintf(debug_file, "Recording was not running, nothing to stop\n"));
} else { } else {
pthread_mutex_lock(&state->mutex);
state->prog_state = STATE_STOPPED; state->prog_state = STATE_STOPPED;
pthread_mutex_unlock(&state->mutex);
D1(fprintf(debug_file, "Dropping attempt to start\n")); D1(fprintf(debug_file, "Dropping attempt to start\n"));
} }
return 0; return 0;
...@@ -675,7 +682,9 @@ int camogm_stop(camogm_state *state) ...@@ -675,7 +682,9 @@ int camogm_stop(camogm_state *state)
state->vf = NULL; state->vf = NULL;
if (rslt) return rslt; if (rslt) return rslt;
state->last = 1; state->last = 1;
pthread_mutex_lock(&state->mutex);
state->prog_state = STATE_STOPPED; state->prog_state = STATE_STOPPED;
pthread_mutex_unlock(&state->mutex);
return 0; return 0;
} }
...@@ -1331,20 +1340,14 @@ int parse_cmd(camogm_state *state, FILE* npipe) ...@@ -1331,20 +1340,14 @@ int parse_cmd(camogm_state *state, FILE* npipe)
state->rawdev.rawdev_path[0] = '\0'; state->rawdev.rawdev_path[0] = '\0';
} }
return 28; return 28;
} else if (strcmp(cmd, "rawdev_read") == 0) { } else if (strcmp(cmd, "reader_stop") == 0) {
if (state->rawdev_op && state->prog_state == STATE_STOPPED) { if (state->prog_state == STATE_READING &&
state->prog_state = STATE_READING; state->rawdev.thread_state == STATE_RUNNING) {
} else {
D0(fprintf(debug_file, "Unable to switch state to 'reading' from current state. Check settings.\n"));
}
return 29;
} else if (strcmp(cmd, "rawdev_stop") == 0) {
if (state->prog_state == STATE_READING) {
state->rawdev.thread_state = STATE_CANCEL; state->rawdev.thread_state = STATE_CANCEL;
} else { } else {
D0(fprintf(debug_file, "Reading thread is not running, nothing to stop\n")); D0(fprintf(debug_file, "Reading thread is not running, nothing to stop\n"));
} }
return 30; return 29;
} }
return -1; return -1;
...@@ -1380,7 +1383,6 @@ void clean_up(camogm_state *state) ...@@ -1380,7 +1383,6 @@ void clean_up(camogm_state *state)
*/ */
int listener_loop(camogm_state *state) int listener_loop(camogm_state *state)
{ {
void *tret;
FILE *cmd_file; FILE *cmd_file;
int rslt, ret, cmd, f_ok; int rslt, ret, cmd, f_ok;
int fp0, fp1; int fp0, fp1;
...@@ -1495,35 +1497,9 @@ int listener_loop(camogm_state *state) ...@@ -1495,35 +1497,9 @@ int listener_loop(camogm_state *state)
} // switch camogm_start() } // switch camogm_start()
if ((rslt != 0) && (rslt != CAMOGM_TOO_EARLY) && (rslt != CAMOGM_FRAME_NOT_READY) && (rslt != CAMOGM_FRAME_CHANGED) ) state->last_error_code = rslt; if ((rslt != 0) && (rslt != CAMOGM_TOO_EARLY) && (rslt != CAMOGM_FRAME_NOT_READY) && (rslt != CAMOGM_FRAME_CHANGED) ) state->last_error_code = rslt;
} else if (state->prog_state == STATE_READING) { } else if (state->prog_state == STATE_READING) {
if (state->rawdev.thread_state == STATE_RUNNING) {
if (state->rawdev.thread_finished == true) {
state->rawdev.thread_state = STATE_STOPPED;
state->rawdev.thread_finished = false;
state->prog_state = STATE_STOPPED;
pthread_join(state->rawdev.tid, &tret);
if ((int)tret != 0 && (int)tret != (int)PTHREAD_CANCELED) {
D0(fprintf(debug_file, "Reading thread returned error %d\n", (int)tret));
} else {
D3(fprintf(debug_file, "Reading thread stopped\n"));
}
} else {
usleep(COMMAND_LOOP_DELAY); usleep(COMMAND_LOOP_DELAY);
}
} else if (state->rawdev.thread_state == STATE_STOPPED) {
state->rawdev.thread_state = STATE_RUNNING;
state->rawdev.thread_finished = false;
// if (pthread_create(&state->rawdev.tid, NULL, build_index, state) != 0) {
if (pthread_create(&state->rawdev.tid, NULL, reader, state) != 0) {
state->prog_state = STATE_STOPPED;
state->rawdev.thread_state = STATE_STOPPED;
D0(fprintf(debug_file, "%s:line %d: Can not start new thread, disk index is not built\n", __FILE__, __LINE__));
}
} else if (state->rawdev.thread_state == STATE_CANCEL) {
// cancel reading thread and return to running state waiting for the thread to terminate
state->rawdev.thread_state = STATE_RUNNING;
pthread_cancel(state->rawdev.tid);
}
} else { // not running, not starting } else { // not running, not starting
state->rawdev.thread_state = STATE_RUNNING;
usleep(COMMAND_LOOP_DELAY); // make it longer but interruptible by signals? usleep(COMMAND_LOOP_DELAY); // make it longer but interruptible by signals?
} }
} // while (process) } // while (process)
...@@ -1683,7 +1659,7 @@ int open_files(camogm_state *state) ...@@ -1683,7 +1659,7 @@ int open_files(camogm_state *state)
} }
framePars[port] = frameParsAll[port]->framePars; framePars[port] = frameParsAll[port]->framePars;
aglobalPars[port] = frameParsAll[port]->globalPars; aglobalPars[port] = frameParsAll[port]->globalPars;
#endif /* DESABLE_CODE */ #endif /* DISABLE_CODE */
} }
return ret; return ret;
...@@ -1718,9 +1694,20 @@ int main(int argc, char *argv[]) ...@@ -1718,9 +1694,20 @@ int main(int argc, char *argv[])
} }
camogm_init(&sstate, argv[1]); camogm_init(&sstate, argv[1]);
if (pthread_mutex_init(&sstate.mutex, NULL) != 0) {
perror("Unable to initialize mutex\n");
return EXIT_FAILURE;
}
ret = open_files(&sstate); ret = open_files(&sstate);
if (ret < 0) if (ret < 0)
return ret; return ret;
if (pthread_create(&sstate.rawdev.tid, NULL, reader, &sstate) != 0 ||
pthread_detach(sstate.rawdev.tid) != 0) {
D0(fprintf(debug_file, "%s:line %d: Can not start reading thread in detached state\n", __FILE__, __LINE__));
clean_up(&sstate);
return EXIT_FAILURE;
}
sstate.rawdev.thread_state = STATE_RUNNING;
ret = listener_loop(&sstate); ret = listener_loop(&sstate);
......
...@@ -107,8 +107,7 @@ typedef struct { ...@@ -107,8 +107,7 @@ typedef struct {
volatile uint64_t curr_pos_r; volatile uint64_t curr_pos_r;
uint64_t file_start; uint64_t file_start;
pthread_t tid; pthread_t tid;
int thread_state; volatile int thread_state;
volatile bool thread_finished;
} rawdev_buffer; } rawdev_buffer;
/** /**
...@@ -137,7 +136,8 @@ typedef struct { ...@@ -137,7 +136,8 @@ typedef struct {
int frame_period[SENSOR_PORTS]; ///< in microseconds (1/10 of what is needed for the Ogm header) int frame_period[SENSOR_PORTS]; ///< in microseconds (1/10 of what is needed for the Ogm header)
int width; ///< image width int width; ///< image width
int height; ///< image height int height; ///< image height
int prog_state; ///< program state flag, can be one of #state_flags volatile int prog_state; ///< program state flag, can be one of #state_flags
pthread_mutex_t mutex; ///< mutex for @e prog_state variable; all modifications to the variable must be using this mutex
int last_error_code; int last_error_code;
ogg_stream_state os; ogg_stream_state os;
ogg_page og; ogg_page og;
......
...@@ -942,10 +942,19 @@ void *reader(void *arg) ...@@ -942,10 +942,19 @@ void *reader(void *arg)
fd = accept(sockfd, NULL, 0); fd = accept(sockfd, NULL, 0);
if (fd == -1) if (fd == -1)
continue; continue;
if (state->prog_state == STATE_STOPPED && state->rawdev_op) {
pthread_mutex_lock(&state->mutex);
state->prog_state = STATE_READING;
pthread_mutex_unlock(&state->mutex);
} else {
close(fd);
D0(fprintf(debug_file, "Can not change state of the program, check settings\n"));
continue;
}
cmd_len = read(fd, cmd_buff, sizeof(cmd_buff) - 1); cmd_len = read(fd, cmd_buff, sizeof(cmd_buff) - 1);
cmd_ptr = cmd_buff; cmd_ptr = cmd_buff;
trim_command(cmd_ptr, cmd_len); trim_command(cmd_ptr, cmd_len);
while ((cmd = parse_command(&cmd_ptr)) != -2) { while ((cmd = parse_command(&cmd_ptr)) != -2 && state->rawdev.thread_state != STATE_CANCEL) {
if (cmd >= 0) if (cmd >= 0)
D6(fprintf(debug_file, "Got command '%s', number %d\n", cmd_list[cmd], cmd)); D6(fprintf(debug_file, "Got command '%s', number %d\n", cmd_list[cmd], cmd));
switch (cmd) { switch (cmd) {
...@@ -984,7 +993,7 @@ void *reader(void *arg) ...@@ -984,7 +993,7 @@ void *reader(void *arg)
mm_file_size = rawdev->mmap_default_size - rawdev->start_pos; mm_file_size = rawdev->mmap_default_size - rawdev->start_pos;
send_fnum(fd, disk_chunks); send_fnum(fd, disk_chunks);
close(fd); close(fd);
while (disk_chunks > 0 && transfer) { while (disk_chunks > 0 && transfer && state->rawdev.thread_state != STATE_CANCEL) {
fd = accept(sockfd, NULL, 0); fd = accept(sockfd, NULL, 0);
if (mmap_disk(rawdev, &mmap_range) == 0) { if (mmap_disk(rawdev, &mmap_range) == 0) {
send_buffer(fd, &rawdev->disk_mmap[mm_file_start], mm_file_size); send_buffer(fd, &rawdev->disk_mmap[mm_file_start], mm_file_size);
...@@ -1041,7 +1050,7 @@ void *reader(void *arg) ...@@ -1041,7 +1050,7 @@ void *reader(void *arg)
cross_boundary_indx = NULL; cross_boundary_indx = NULL;
file_cntr = 0; file_cntr = 0;
transfer = true; transfer = true;
while (file_cntr < index_dir.size && disk_indx != NULL) { while (file_cntr < index_dir.size && disk_indx != NULL && state->rawdev.thread_state != STATE_CANCEL) {
if (is_in_range(&mmap_range, disk_indx) && rawdev->disk_mmap != NULL) { if (is_in_range(&mmap_range, disk_indx) && rawdev->disk_mmap != NULL) {
fd = accept(sockfd, NULL, 0); fd = accept(sockfd, NULL, 0);
mm_file_start = disk_indx->f_offset - rawdev->mmap_offset; mm_file_start = disk_indx->f_offset - rawdev->mmap_offset;
...@@ -1090,6 +1099,9 @@ void *reader(void *arg) ...@@ -1090,6 +1099,9 @@ void *reader(void *arg)
} }
if (fstat(fd, &stat_buff) != EBADF) if (fstat(fd, &stat_buff) != EBADF)
close(fd); close(fd);
pthread_mutex_lock(&state->mutex);
state->prog_state = STATE_STOPPED;
pthread_mutex_unlock(&state->mutex);
usleep(COMMAND_LOOP_DELAY); usleep(COMMAND_LOOP_DELAY);
} }
pthread_cleanup_pop(0); pthread_cleanup_pop(0);
...@@ -1114,7 +1126,6 @@ static inline void exit_thread(void *arg) ...@@ -1114,7 +1126,6 @@ static inline void exit_thread(void *arg)
close(*s->sockfd_const); close(*s->sockfd_const);
if (fstat(*s->sockfd_temp, &buff) != EBADF) if (fstat(*s->sockfd_temp, &buff) != EBADF)
close(*s->sockfd_temp); close(*s->sockfd_temp);
s->state->rawdev.thread_finished = true;
} }
/** /**
...@@ -1171,7 +1182,7 @@ void build_index(camogm_state *state, struct disk_idir *idir) ...@@ -1171,7 +1182,7 @@ void build_index(camogm_state *state, struct disk_idir *idir)
zero_cross = 0; zero_cross = 0;
search_state = SEARCH_SKIP; search_state = SEARCH_SKIP;
idir_result = 0; idir_result = 0;
while (process) { while (process && state->rawdev.thread_state != STATE_CANCEL) {
rd = read(state->rawdev.rawdev_fd, buff, sizeof(buff)); rd = read(state->rawdev.rawdev_fd, buff, sizeof(buff));
err = errno; err = errno;
if ((rd > 0) && (dev_curr_pos + rd > state->rawdev.end_pos)) { if ((rd > 0) && (dev_curr_pos + rd > state->rawdev.end_pos)) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment