Commit d8ae21c5 authored by Mikhail Karpenko's avatar Mikhail Karpenko

Fix one error with buffer roll over, add debug to rtp_stream

parent 203b583d
/** /**
* @file FILENAME * @file rtp_stream.cpp
* @brief BRIEF DESCRIPTION * @brief Base class for RTP streams
* @copyright Copyright (C) YEAR Elphel Inc. * @copyright Copyright (C) 2017 Elphel Inc.
* @author AUTHOR <EMAIL> * @author AUTHOR <EMAIL>
* *
* @par License: * @par License:
...@@ -21,18 +21,23 @@ ...@@ -21,18 +21,23 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <iostream> #include <iostream>
#include "rtp_stream.h" #include "rtp_stream.h"
#include "helper.h" #include "helper.h"
using namespace std; using namespace std;
#define CNAME "elphel353" #define CNAME "elphel393"
#undef RTP_DEBUG //#undef RTP_DEBUG
//#define RTP_DEBUG #define RTP_DEBUG
#ifdef RTP_DEBUG #ifdef RTP_DEBUG
#define D(a) a #define D(s_port, a) \
do { \
cerr << __FILE__ << ": " << __FUNCTION__ << ": " << __LINE__ << ": sensor port: " << s_port << " "; \
a; \
} while (0)
#else #else
#define D(a) #define D(a)
#endif #endif
...@@ -45,15 +50,16 @@ RTP_Stream::RTP_Stream(void) { ...@@ -45,15 +50,16 @@ RTP_Stream::RTP_Stream(void) {
rtcp_socket = NULL; rtcp_socket = NULL;
sem_init(&sem_play, 0, 0); sem_init(&sem_play, 0, 0);
pth_id = -1; pth_id = -1;
packet_num = 0;
} }
RTP_Stream::~RTP_Stream() { RTP_Stream::~RTP_Stream() {
//cerr << "RTP_Stream::~RTP_Stream() for stream " << stream_name << endl; //cerr << "RTP_Stream::~RTP_Stream() for stream " << stream_name << endl;
if(pth_id >= 0) if (pth_id >= 0)
pthread_cancel(pth); pthread_cancel(pth);
if(rtp_socket != NULL) if (rtp_socket != NULL)
delete rtp_socket; delete rtp_socket;
if(rtcp_socket != NULL) if (rtcp_socket != NULL)
delete rtcp_socket; delete rtcp_socket;
sem_destroy(&sem_play); sem_destroy(&sem_play);
pthread_mutex_destroy(&pthm_flow); pthread_mutex_destroy(&pthm_flow);
...@@ -72,9 +78,9 @@ void *RTP_Stream::pthread_f(void *_this) { ...@@ -72,9 +78,9 @@ void *RTP_Stream::pthread_f(void *_this) {
} }
void RTP_Stream::Start(string ip, int port, int ttl) { void RTP_Stream::Start(string ip, int port, int ttl) {
D( cerr << " new " << stream_name << " UDP socket at port: " << port << endl;) D(sensor_port, cerr << " new " << stream_name << " UDP socket at port: " << port << endl);
pthread_mutex_lock(&pthm_flow); pthread_mutex_lock(&pthm_flow);
if(!_play) { if (!_play) {
rtp_socket = new Socket(ip, port, Socket::TYPE_UDP, ttl); rtp_socket = new Socket(ip, port, Socket::TYPE_UDP, ttl);
rtcp_socket = new Socket(ip, port + 1, Socket::TYPE_UDP, ttl); rtcp_socket = new Socket(ip, port + 1, Socket::TYPE_UDP, ttl);
rtp_packets = 0; rtp_packets = 0;
...@@ -92,20 +98,20 @@ D( cerr << " new " << stream_name << " UDP socket at port: " << port << endl;) ...@@ -92,20 +98,20 @@ D( cerr << " new " << stream_name << " UDP socket at port: " << port << endl;)
} }
void RTP_Stream::Stop(void) { void RTP_Stream::Stop(void) {
D( cerr << __FILE__<< ":"<< __FUNCTION__ << ":" <<__LINE__ <<endl;) D(sensor_port, cerr << __FILE__<< ":"<< __FUNCTION__ << ":" <<__LINE__ <<endl);
//cerr << "RTP_Stream::Stop() for stream " << stream_name << " - begin" << endl; //cerr << "RTP_Stream::Stop() for stream " << stream_name << " - begin" << endl;
pthread_mutex_lock(&pthm_flow); pthread_mutex_lock(&pthm_flow);
if(_play) { if (_play) {
//cerr << "RTP_Stream::Stop() for stream " << stream_name << " - in progress" << endl; //cerr << "RTP_Stream::Stop() for stream " << stream_name << " - in progress" << endl;
/// reset semaphore /// reset semaphore
sem_init(&sem_play, 0, 0); sem_init(&sem_play, 0, 0);
_play = false; _play = false;
// delete rtcp_socket; // delete rtcp_socket;
if(rtp_socket != NULL) { if (rtp_socket != NULL) {
delete rtp_socket; delete rtp_socket;
rtp_socket = NULL; rtp_socket = NULL;
} }
if(rtcp_socket != NULL) { if (rtcp_socket != NULL) {
delete rtcp_socket; delete rtcp_socket;
rtcp_socket = NULL; rtcp_socket = NULL;
} }
...@@ -124,17 +130,17 @@ D( cerr << __FILE__<< ":"<< __FUNCTION__ << ":" <<__LINE__ <<endl;) ...@@ -124,17 +130,17 @@ D( cerr << __FILE__<< ":"<< __FUNCTION__ << ":" <<__LINE__ <<endl;)
* @return never * @return never
*/ */
void *RTP_Stream::thread(void) { void *RTP_Stream::thread(void) {
D( cerr << "RTP_Stream::thread(void)" <<endl;) D(sensor_port, cerr << "RTP_Stream::thread(void)" << endl);
for(;;) { for (;;) {
pthread_mutex_lock(&pthm_flow); pthread_mutex_lock(&pthm_flow);
if(_play) { if (_play) {
long f = process(); long f = process();
if(f > 0) if (f > 0)
rtcp(); rtcp();
// process() and rtcp() use sockets // process() and rtcp() use sockets
pthread_mutex_unlock(&pthm_flow); pthread_mutex_unlock(&pthm_flow);
if(f < 0) { if (f < 0) {
D( cerr << __FILE__<< ":"<< __FUNCTION__ << ":" <<__LINE__<< "process exception detected: " << f << endl;) D(sensor_port, cerr << __FILE__<< ":"<< __FUNCTION__ << ":" <<__LINE__<< "process exception detected: " << f << endl);
// cerr << "Stop() from thread for stream " << stream_name << endl; // cerr << "Stop() from thread for stream " << stream_name << endl;
Stop(); Stop();
} }
...@@ -148,14 +154,14 @@ D( cerr << __FILE__<< ":"<< __FUNCTION__ << ":" <<__LINE__<< "process excepti ...@@ -148,14 +154,14 @@ D( cerr << __FILE__<< ":"<< __FUNCTION__ << ":" <<__LINE__<< "process excepti
void RTP_Stream::rtcp(void) { void RTP_Stream::rtcp(void) {
// check time for next one RTCP... // check time for next one RTCP...
if(f_tv.tv_sec == 0 && f_tv.tv_usec == 0) if (f_tv.tv_sec == 0 && f_tv.tv_usec == 0)
return; return;
long td = time_delta_us(f_tv, rtcp_tv); long td = time_delta_us(f_tv, rtcp_tv);
if(td < 0) { if (td < 0) {
rtcp_tv = f_tv; rtcp_tv = f_tv;
return; return;
} }
if(td < rtcp_delay) if (td < rtcp_delay)
return; return;
rtcp_tv = f_tv; rtcp_tv = f_tv;
rtcp_send_sdes(); rtcp_send_sdes();
...@@ -172,61 +178,61 @@ void RTP_Stream::rtcp_send_sr(void) { ...@@ -172,61 +178,61 @@ void RTP_Stream::rtcp_send_sr(void) {
packet[0] = 0x81; packet[0] = 0x81;
packet[1] = 200; // SR packet[1] = 200; // SR
us = htons(((packet_len) / 4) - 1); us = htons(((packet_len) / 4) - 1);
memcpy((void *)&packet[2], (void *)&us, 2); memcpy((void *) &packet[2], (void *) &us, 2);
memcpy((void *)&packet[4], (void *)&SSRC, 4); memcpy((void *) &packet[4], (void *) &SSRC, 4);
// NTP timestamp is a fixed point 32.32 format time // NTP timestamp is a fixed point 32.32 format time
ul = htonl(f_tv.tv_sec); ul = htonl(f_tv.tv_sec);
memcpy((void *)&packet[8], (void *)&ul, 4); memcpy((void *) &packet[8], (void *) &ul, 4);
double d = f_tv.tv_usec; double d = f_tv.tv_usec;
d /= 1000000.0; d /= 1000000.0;
d *= 65536.0; d *= 65536.0;
d *= 4096.0; d *= 4096.0;
uint32_t f = (uint32_t)d; uint32_t f = (uint32_t) d;
if(f > 0x0FFFFFFF) if (f > 0x0FFFFFFF)
f = 0x0FFFFFFF; f = 0x0FFFFFFF;
f <<= 4; f <<= 4;
ul = htonl(f); ul = htonl(f);
memcpy((void *)&packet[12], (void *)&ul, 4); memcpy((void *) &packet[12], (void *) &ul, 4);
ul = htonl(timestamp); ul = htonl(timestamp);
memcpy((void *)&packet[16], (void *)&ul, 4); memcpy((void *) &packet[16], (void *) &ul, 4);
ul = htonl(rtp_packets); ul = htonl(rtp_packets);
memcpy((void *)&packet[20], (void *)&ul, 4); memcpy((void *) &packet[20], (void *) &ul, 4);
ul = htonl(rtp_octets); ul = htonl(rtp_octets);
memcpy((void *)&packet[24], (void *)&ul, 4); memcpy((void *) &packet[24], (void *) &ul, 4);
rtcp_socket->send(packet, packet_len); rtcp_socket->send(packet, packet_len);
} }
void RTP_Stream::rtcp_send_sdes(void) { void RTP_Stream::rtcp_send_sdes(void) {
char packet[8 + 4 + 128]; // by RTP RFC 3550, for SDES RTCP packet needed 8 + 4 + ... bytes, char packet[8 + 4 + 128]; // by RTP RFC 3550, for SDES RTCP packet needed 8 + 4 + ... bytes,
// so get additional 128 bytes for 126 CNAME field // so get additional 128 bytes for 126 CNAME field
int packet_len = 0; int packet_len = 0;
int padding=0; int padding = 0;
uint16_t us; uint16_t us;
const char *cname = CNAME; const char *cname = CNAME;
int cname_len = strlen(cname); int cname_len = strlen(cname);
bzero((void *)packet, 140); //8+4+128 bzero((void *) packet, 140); //8+4+128
// RTCP header // RTCP header
packet[0] = 0x81; packet[0] = 0x81;
packet[1] = 202; packet[1] = 202;
memcpy((void *)&packet[4], (void *)&SSRC, 4); memcpy((void *) &packet[4], (void *) &SSRC, 4);
packet_len += 8; packet_len += 8;
// SDES fields // SDES fields
packet[8] = 0x01; packet[8] = 0x01;
memcpy((void *)&packet[10], (void *)cname, cname_len); memcpy((void *) &packet[10], (void *) cname, cname_len);
packet_len += 2; // + cname_len; packet_len += 2; // + cname_len;
// calculate common length SDES // calculate common length SDES
padding=(cname_len+2)%4; padding = (cname_len + 2) % 4;
if(padding) if (padding)
cname_len += (4-padding); cname_len += (4 - padding);
packet[9] = cname_len; packet[9] = cname_len;
// each chunk MUST be terminated by one or more null octets(RFC3350) // each chunk MUST be terminated by one or more null octets(RFC3350)
packet_len += (cname_len+4); packet_len += (cname_len + 4);
us = htons((packet_len / 4) - 1); us = htons((packet_len / 4) - 1);
memcpy((void *)&packet[2], (void *)&us, 2); memcpy((void *) &packet[2], (void *) &us, 2);
rtcp_socket->send(packet, packet_len); rtcp_socket->send(packet, packet_len);
} }
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <elphel/x393_devices.h> #include <elphel/x393_devices.h>
#include <iomanip>
#include "streamer.h" #include "streamer.h"
...@@ -74,6 +75,15 @@ using namespace std; ...@@ -74,6 +75,15 @@ using namespace std;
#define D3(a) #define D3(a)
#endif #endif
#ifdef VIDEO_DEBUG
#define D_FOLLOW(a) \
do { \
a; \
} while (0)
#else
#define D(a)
#endif
//Video *video = NULL; //Video *video = NULL;
#define QTABLES_INCLUDE #define QTABLES_INCLUDE
...@@ -87,7 +97,7 @@ using namespace std; ...@@ -87,7 +97,7 @@ using namespace std;
//int fd_circbuf = 0; //int fd_circbuf = 0;
//int fd_jpeghead = 0; /// to get quantization tables //int fd_jpeghead = 0; /// to get quantization tables
//int fd_fparmsall = 0; //int fd_fparmsall = 0;
int lastDaemonBit = DAEMON_BIT_STREAMER; //int lastDaemonBit = DAEMON_BIT_STREAMER;
//struct framepars_all_t *frameParsAll; //struct framepars_all_t *frameParsAll;
//struct framepars_t *framePars; //struct framepars_t *framePars;
...@@ -124,6 +134,7 @@ Video::Video(int port, Parameters *pars) { ...@@ -124,6 +134,7 @@ Video::Video(int port, Parameters *pars) {
err_msg = "can't mmap " + *circbuf_file_names[sensor_port]; err_msg = "can't mmap " + *circbuf_file_names[sensor_port];
throw runtime_error(err_msg); throw runtime_error(err_msg);
} }
buffer_ptr_end = (unsigned char *)(buffer_ptr + BYTE2DW(buffer_length));
/// Skip several frames if it is just booted /// Skip several frames if it is just booted
/// May get stuck here if compressor is off, it should be enabled externally /// May get stuck here if compressor is off, it should be enabled externally
...@@ -453,10 +464,13 @@ long Video::capture(void) { ...@@ -453,10 +464,13 @@ long Video::capture(void) {
frame_ptr = (char *) ((unsigned long) buffer_ptr + latestAvailableFrame_ptr); frame_ptr = (char *) ((unsigned long) buffer_ptr + latestAvailableFrame_ptr);
frame_len = get_frame_len(latestAvailableFrame_ptr); frame_len = get_frame_len(latestAvailableFrame_ptr);
D3(sensor_port, cerr << "Frame length " << frame_len << endl); D3(sensor_port, cerr << "Frame start byte index: " << frameStartByteIndex << ", Frame length " << frame_len);
// read time stamp // read time stamp
char *ts_ptr = (char *) ((unsigned long) frame_ptr + (long) (((frame_len + CCAM_MMAP_META + 3) & (~0x1f)) + 32 - CCAM_MMAP_META_SEC)); unsigned char *ts_ptr = (unsigned char *) ((unsigned long) frame_ptr + (long) (((frame_len + CCAM_MMAP_META + 3) & (~0x1f)) + 32 - CCAM_MMAP_META_SEC));
if (ts_ptr >= buffer_ptr_end) {
ts_ptr -= buffer_length;
}
unsigned long t[2]; unsigned long t[2];
memcpy(&t, (void *) ts_ptr, 8); memcpy(&t, (void *) ts_ptr, 8);
f_tv.tv_sec = t[0]; f_tv.tv_sec = t[0];
...@@ -465,7 +479,7 @@ long Video::capture(void) { ...@@ -465,7 +479,7 @@ long Video::capture(void) {
struct interframe_params_t curr_frame_params; struct interframe_params_t curr_frame_params;
struct interframe_params_t *fp = &curr_frame_params; struct interframe_params_t *fp = &curr_frame_params;
get_frame_pars(fp, latestAvailableFrame_ptr); get_frame_pars(fp, latestAvailableFrame_ptr);
D3(sensor_port, cerr << "frame_pars->signffff " << fp->signffff << endl); D_FOLLOW(cerr << ", frame_pars->signffff " << fp->signffff << endl);
// See if the frame parameters are the same as were used when starting the stream, // See if the frame parameters are the same as were used when starting the stream,
// otherwise check for up to G_SKIP_DIFF_FRAME older frames and return them instead. // otherwise check for up to G_SKIP_DIFF_FRAME older frames and return them instead.
...@@ -473,6 +487,14 @@ long Video::capture(void) { ...@@ -473,6 +487,14 @@ long Video::capture(void) {
// Each time the latest acquired frame is considered, so we do not need to save frame pointer additionally // Each time the latest acquired frame is considered, so we do not need to save frame pointer additionally
if ((fp->width != used_width) || (fp->height != used_height)) { if ((fp->width != used_width) || (fp->height != used_height)) {
D3(sensor_port, cerr << "Looks like frame size changed, new params: h = " << fp->height << ", w = " << fp->width << endl); D3(sensor_port, cerr << "Looks like frame size changed, new params: h = " << fp->height << ", w = " << fp->width << endl);
D3(sensor_port, cerr << "shoud be h = " << used_height << ", w = " << used_width << endl);
D3(sensor_port, cerr << "latestAvailableFrame_ptr: " << latestAvailableFrame_ptr << endl);
D3(sensor_port, cerr << "Interframe params:" << endl);
unsigned int *iframe_data = (unsigned int *)fp;
for (size_t j = 0; j < sizeof(struct interframe_params_t) / 4; j++)
cerr << setfill('0') << setw(2) << "0x" << hex << iframe_data[j] << " ";
cerr << dec << endl;
for (before = 1; before <= (int) params->getGPValue(G_SKIP_DIFF_FRAME); before++) { for (before = 1; before <= (int) params->getGPValue(G_SKIP_DIFF_FRAME); before++) {
if (((frameStartByteIndex = getFramePars(&frame_pars, before))) if (((frameStartByteIndex = getFramePars(&frame_pars, before)))
&& (frame_pars.width == used_width) && (frame_pars.height == used_height)) { && (frame_pars.width == used_width) && (frame_pars.height == used_height)) {
...@@ -570,10 +592,9 @@ long Video::process(void) { ...@@ -570,10 +592,9 @@ long Video::process(void) {
uint32_t ts; uint32_t ts;
ts = timestamp; ts = timestamp;
ts = htonl(ts); ts = htonl(ts);
D(sensor_port, cerr << "This frame's time stamp: " << timestamp << endl);
long offset = 0; long offset = 0;
void *v_ptr[4];
int v_len[4] = { 0, 0, 0, 0 };
struct iovec iov[4]; struct iovec iov[4];
int vect_num; int vect_num;
bool first = true; bool first = true;
...@@ -639,7 +660,8 @@ long Video::process(void) { ...@@ -639,7 +660,8 @@ long Video::process(void) {
} else { } else {
iov[vect_num++].iov_len = 20; iov[vect_num++].iov_len = 20;
} }
if ((data + packet_len) <= (unsigned char *)(buffer_ptr + BYTE2DW(buffer_length))) { // if ((data + packet_len) <= (unsigned char *)(buffer_ptr + BYTE2DW(buffer_length))) {
if ((data + packet_len) <= buffer_ptr_end) {
iov[vect_num].iov_base = data; iov[vect_num].iov_base = data;
iov[vect_num++].iov_len = packet_len; iov[vect_num++].iov_len = packet_len;
data += packet_len; data += packet_len;
......
...@@ -77,9 +77,11 @@ protected: ...@@ -77,9 +77,11 @@ protected:
// struct timeval f_tv; // struct timeval f_tv;
long buffer_length; long buffer_length;
unsigned long *buffer_ptr; unsigned long *buffer_ptr;
unsigned char *buffer_ptr_end; // pointer to the end of the buffer
void *frame_ptr; void *frame_ptr;
int fd_circbuf; int fd_circbuf;
int fd_jpeghead; int fd_jpeghead;
int lastDaemonBit;
long capture(void); long capture(void);
// bool process(void); // bool process(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment