Commit d435dddb authored by Oleg Dzhimiev's avatar Oleg Dzhimiev

experimenting writev()

parent 7a28eb0d
......@@ -89,9 +89,12 @@ int main(int argc, char *argv[]) {
}
if (opt != "")
args[opt] = "";
cout << "Parsed command line arguments:" << endl;
for (map<string, string>::iterator it = args.begin(); it != args.end(); it++) {
cerr << "|" << (*it).first << "| == |" << (*it).second << "|" << endl;
}
if ((args_it = args.find("h")) != args.end()) {
print_help(argv);
exit(EXIT_SUCCESS);
......@@ -110,9 +113,10 @@ int main(int argc, char *argv[]) {
audio_en = true;
else
audio_en = false;
cout << "Start thread " << i << endl;
streamers[i] = new Streamer(args, i, audio_en);
cout << "Starting a new streamer thread for sensor port " << i << endl;
streamers[i] = new Streamer(args, i, audio_en);
pthread_attr_init(&attr);
ret_val = pthread_create(&threads[i], &attr, Streamer::pthread_f, (void *) streamers[i]);
if (ret_val != 0) {
......@@ -123,8 +127,10 @@ int main(int argc, char *argv[]) {
}
pthread_attr_destroy(&attr);
}
for (size_t i = 0; i < SENSOR_PORTS; i++)
for (size_t i = 0; i < SENSOR_PORTS; i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
......@@ -90,6 +90,7 @@ void RTP_Stream::Start(string ip, int port, int ttl) {
// rtcp_delay = 2500000; // in usec
// rtcp_delay = 1250000; // in usec
rtcp_delay = 2000000; // in usec
//rtcp_delay = 5000000; // in usec
_play = true;
/// unlock semaphore - 'play' event
sem_post(&sem_play);
......@@ -175,12 +176,23 @@ void RTP_Stream::rtcp_send_sr(void) {
uint32_t ul;
// RTCP header
packet[0] = 0x81;
// the lower 5 bits of packet[0] are RC
// Report Count - we used to send '1' but with 0 reports
// wireshark and vlc were whining about this.
//
// old
//packet[0] = 0x81;
// new
packet[0] = 0x80;
packet[1] = 200; // SR
us = htons(((packet_len) / 4) - 1);
memcpy((void *) &packet[2], (void *) &us, 2);
memcpy((void *) &packet[4], (void *) &SSRC, 4);
// old: this is done differently in live555, see RTCP.cpp
// NTP timestamp is a fixed point 32.32 format time
/*
ul = htonl(f_tv.tv_sec);
memcpy((void *) &packet[8], (void *) &ul, 4);
double d = f_tv.tv_usec;
......@@ -191,7 +203,17 @@ void RTP_Stream::rtcp_send_sr(void) {
if (f > 0x0FFFFFFF)
f = 0x0FFFFFFF;
f <<= 4;
ul = htonl(f);
*/
// new (as in RTCP.cpp) 2019/02/12
ul = htonl(f_tv.tv_sec+0x83AA7E80);
memcpy((void *) &packet[8], (void *) &ul, 4);
double d = (f_tv.tv_usec/15625.0)*0x04000000;
ul = htonl((unsigned)(d+0.5));
// end new
memcpy((void *) &packet[12], (void *) &ul, 4);
ul = htonl(timestamp);
memcpy((void *) &packet[16], (void *) &ul, 4);
......
......@@ -181,12 +181,15 @@ void RTSP_Server::main(void) {
D(static int count = 0;)
bool to_poll = true;
// Parameters *params = Parameters::instance();
while (true) {
if (to_poll) {
int poll_rez = Socket::poll(s, 500);
D( {if (count < 5) {
cerr << "poll..." << endl;
cerr << "Poll..." << endl;
count++;
}
});
......@@ -204,12 +207,23 @@ void RTSP_Server::main(void) {
continue;
}
}
to_poll = true;
for (list<Socket *>::iterator it = s.begin(); it != s.end(); it++) {
Socket::state state = (*it)->state_refresh();
if (state == Socket::STATE_IN || state == Socket::STATE_DISCONNECT) {
D2(cerr << endl << "something happen on the socket!" << endl;)
if (state == Socket::STATE_IN){
D2(cerr << "Socket status: STATE_IN" << endl;)
}else{
D2(cerr << "Socket status: STATE_DISCONNECT" << endl;)
}
if (*it == socket_main_1 || *it == socket_main_2) { // || *it == socket_main_3) {
D2(cerr << "Socket type: main" << endl;)
// Socket *in = socket_main->accept();
Socket *in = (*it)->accept();
if (in) {
......@@ -220,7 +234,7 @@ void RTSP_Server::main(void) {
}
} else {
// check for remove closed socket !
D2(cerr << "was with non-main socket" << endl;)
D2(cerr << "Socket type: not main" << endl;)
if (!process(*it)) {
D2(cerr << "process failed - remove it!" << endl;)
D2(fprintf(stderr, "delete: 0x%p\n", *it);)
......
......@@ -19,6 +19,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _XOPEN_SOURCE_EXTENDED 1
#include <string>
#include <iostream>
#include <cstdlib>
......@@ -35,6 +37,15 @@
#include <net/if.h>
#include <sys/ioctl.h>
#include <sys/uio.h>
#include <sys/epoll.h>
#include <sys/ioctl.h>
#include <linux/sockios.h>
#include <stdio.h>
#include <sys/time.h>
using namespace std;
#include "socket.h"
......@@ -191,6 +202,8 @@ Socket::Socket(string _ip, int _port, stype _type, int ttl) {
port = _port;
is_multicast = false;
int fileflags;
D(cerr << "new socket..." << endl;)
_state = STATE_EMPTY;
......@@ -219,6 +232,7 @@ D( cerr << "TCP ::bind() == " << t; if(t != 0) cerr << "; errno == " << strerro
break;
}
case TYPE_UDP: {
//fd = socket(PF_INET, SOCK_DGRAM | SOCK_NONBLOCK, 0);
fd = socket(PF_INET, SOCK_DGRAM, 0);
struct sockaddr_in saddr;
memset(&saddr, 0, sizeof(struct sockaddr_in));
......@@ -241,6 +255,36 @@ D( cerr << "::connect() == " << t << endl;)
//cerr << "try to set TTL to value == " << ttl << endl;
setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl));
}
int fl;
/*
fl = fcntl(fd, F_GETFL);
//printf("Socket flags: 0x%08x\n",fl);
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
*/
fl = fcntl(fd, F_GETFL);
printf("Socket flags: 0x%08x\n",fl);
// set output buffer here
/*
int obuf = 163840*4;
int rbuf = 0;
socklen_t optlen = sizeof(rbuf);
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &obuf, sizeof(obuf));
getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &rbuf, &optlen);
cerr << "SO_SND_BUF == " << rbuf << endl;
*/
/*
if (fileflags = fcntl(fd, F_GETFL, 0) == -1){
cerr << "fcntl F_GETFL" << endl;
}
if (fcntl(fd, F_SETFL, fileflags | FNDELAY) == -1){
cerr << "fcntl F_SETFL, FNDELAY" << endl;
}
*/
//setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &obuf, sizeof(obuf));
break;
}
default:
......@@ -265,40 +309,80 @@ Socket::~Socket() {
int Socket::poll(list<Socket *> &s, int timeout) {
struct pollfd *pfd;
ssize_t s_size = s.size();
D2(cerr << "Socket::poll()..." << endl;)
//D2(cerr << "Socket::poll()..." << endl;)
pfd = (struct pollfd *)malloc(sizeof(struct pollfd) * s_size);
memset(pfd, 0, sizeof(struct pollfd) * s_size);
int i = 0;
D2(cerr << "socket.fd == ";)
D2(cerr << "socket.fd == ";)
for(list<Socket *>::iterator it = s.begin(); it != s.end(); it++, i++) {
pfd[i].fd = (*it)->fd;
D2(cerr << pfd[i].fd << "; ";)
// pfd[i].events = 0xFFFF;
D2(cerr << pfd[i].fd << "; ";)
//pfd[i].events = 0xFFFF;
pfd[i].events = POLLIN;
pfd[i].revents = 0x00;
}
D2(cerr << endl;)
D2(cerr << endl;)
int p = ::poll(pfd, s_size, timeout);
i = 0;
for(list<Socket *>::iterator it = s.begin(); it != s.end(); it++, i++) {
(*it)->_state = STATE_EMPTY;
D2(cerr << "revents == " << pfd[i].revents << "; POLLIN == " << POLLIN << endl;)
//D2(cerr << "revents == " << pfd[i].revents << "; POLLIN == " << POLLIN << endl;)
if(pfd[i].revents & POLLIN) {
(*it)->_state = STATE_IN;
D2(cerr << "STATE_IN; fd == " << (*it)->fd << "; revents == " << pfd[i].revents << endl;)
D2(cerr << "setting STATE_IN; fd == " << (*it)->fd << "; revents == " << pfd[i].revents << endl;)
}
if(pfd[i].revents & POLLHUP) {
// if(pfd[i].revents & POLLHUP || pfd[i].revents & POLLERR) {
//if(pfd[i].revents & POLLHUP || pfd[i].revents & POLLERR) {
(*it)->_state = STATE_DISCONNECT;
//D2(cerr << "STATE_DISCONNECT; fd == " << (*it)->fd << "; revents == " << pfd[i].revents << endl;)
cerr << "STATE_DISCONNECT; fd == " << (*it)->fd << "; revents == " << pfd[i].revents << endl;
//D2(cerr << "STATE_DISCONNECT; fd == " << (*it)->fd << "; revents == " << pfd[i].revents << endl;)
cerr << "STATE_DISCONNECT; fd == " << (*it)->fd << "; revents == " << pfd[i].revents << endl;
}
}
free((void *)pfd);
return p;
}
int Socket::pollout(list<Socket *> &s, int timeout) {
struct pollfd *pfd;
ssize_t s_size = s.size();
D2(cerr << "Socket::pollout()..." << endl;)
pfd = (struct pollfd *)malloc(sizeof(struct pollfd) * s_size);
memset(pfd, 0, sizeof(struct pollfd) * s_size);
int i = 0;
//D2(cerr << "socket.fd == ";)
for(list<Socket *>::iterator it = s.begin(); it != s.end(); it++, i++) {
pfd[i].fd = (*it)->fd;
D2(cerr << "pollout socket's fd = " << pfd[i].fd << "; ";)
pfd[i].events = POLLOUT | POLLERR;
pfd[i].revents = 0x00;
}
D2(cerr << endl;)
int p = ::poll(pfd, s_size, timeout);
i = 0;
for(list<Socket *>::iterator it = s.begin(); it != s.end(); it++, i++) {
/*
if(pfd[i].revents & POLLIN) {
cerr << "it's POLLOUT" << endl;
}
*/
if(pfd[i].revents & POLLERR) {
cerr << "it's POLLERR" << endl;
}
}
free((void *)pfd);
return p;
}
void Socket::listen(int in) {
long sock_flags;
......@@ -421,12 +505,104 @@ bool Socket::send3v(void **v_ptr, int *v_len) {
return false;
}
#define EPOLL_TIMEOUT 1000
void poll_wait(int fd, int events)
{
int n;
struct pollfd pollfds[1];
memset((char *) &pollfds, 0, sizeof(pollfds));
pollfds[0].fd = fd;
pollfds[0].events = events;
n = poll(pollfds, 1, -1);
if (n < 0) {
cerr << "POLL FAILED HARD" << endl;
}
}
bool Socket::send_vect(const struct iovec *iov, int num)
{
bool ret_val = false;
/*
// epoll_wait begin
int epfd = epoll_create1(0);
struct epoll_event event;
memset(&event, 0, sizeof(event));
event.data.fd = fd;
event.events = EPOLLOUT;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);
int num_ready = epoll_wait(epfd, &event, 20, EPOLL_TIMEOUT);
if(num_ready<0){
cerr << "epoll_wait() returned " << num_ready << endl;
}
close(epfd);
// epoll_wait end
*/
/*
int socket_fd, result;
fd_set writeset;
do {
FD_ZERO(&writeset);
FD_SET(fd, &writeset);
result = select(fd + 1, NULL, &writeset, NULL, NULL);
} while (result == -1 && errno == EINTR);
if (result<0){
cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!FAIL!!!!!!!!!!!!!!!!!!!!!!!" << endl;
}
*/
if (::writev(fd, iov, num))
ret_val = true;
int used = 0;
if (ioctl(fd, SIOCOUTQ, &used)<0){
printf("IOCTL ERROR");
}
//while (pending>0) {
// ioctl(fd, SIOCOUTQ, &pending);
//}
//cerr << iov[0].iov_base << " " << iov[1].iov_base << endl;
//unsigned char * p = (unsigned char *) iov[0].iov_base;
//cerr << "writev" << endl;
// this is a sequence number
unsigned short * p = ((unsigned short *) iov[0].iov_base);
timeval t0;
gettimeofday(&t0, NULL);
printf("writev %06d %lu.%06d %d\n",ntohs(p[1]),t0.tv_sec,t0.tv_usec,used);
int res = 0;
poll_wait(fd, POLLOUT|POLLERR);
gettimeofday(&t0, NULL);
printf(" %lu.%06d\n",t0.tv_sec,t0.tv_usec);
res = ::writev(fd, iov, num);
poll_wait(fd, POLLOUT|POLLERR);
gettimeofday(&t0, NULL);
printf(" %lu.%06d\n",t0.tv_sec,t0.tv_usec);
if (res<0){
//cerr << "writev() failed" << endl;
printf("writev failed: %d\n",res);
return false;
}else{
//cerr << "wrote: " << res << endl;
printf("wrote: %d (errno = %d)\n",res, errno);
/*
if (ioctl(fd, SIOCOUTQ, &used)<0){
printf("IOCTL ERROR 2");
}
*/
return true;
}
return ret_val;
}
......@@ -63,6 +63,7 @@ public:
bool send_vect(const struct iovec *iov, int num);
static int poll(list<Socket *> &s, int timeout = -1);
static int pollout(list<Socket *> &s, int timeout = -1);
void listen(int in);
Socket *accept(void);
// bool connect(void);
......@@ -88,6 +89,8 @@ protected:
// int fd;
state _state;
pthread_mutex_t pthm_sock;
int ttl;
unsigned short ip_id;
bool _is_multicast(string ip);
......
......@@ -36,13 +36,10 @@ using namespace std;
//#undef RTSP_DEBUG_2
#define RTSP_DEBUG_2
#undef RTSP_DEBUG
#undef RTSP_DEBUG_2
#ifdef RTSP_DEBUG
#define D(s_port, a) \
do { \
cerr << __FILE__ << ": " << __FUNCTION__ << ": " << __LINE__ << ": sensor port: " << s_port << " "; \
cerr << __FILE__ << ": " << __FUNCTION__ << ": " << __LINE__ << ": sensor port: " << s_port << " :: "; \
a; \
} while (0)
#else
......@@ -52,7 +49,7 @@ using namespace std;
#ifdef RTSP_DEBUG_2
#define D2(s_port, a) \
do { \
cerr << __FILE__ << ": " << __FUNCTION__ << ": " << __LINE__ << ": sensor port: " << s_port << " "; \
cerr << "p" << s_port << ": "; \
a; \
} while (0)
#else
......@@ -131,11 +128,12 @@ int Streamer::f_handler(void *ptr, RTSP_Server *rtsp_server, RTSP_Server::event
}
int Streamer::update_settings(bool apply) {
D(sensor_port, cerr << "update_settings" << endl);
D2(sensor_port, cerr << "update_settings" << endl);
// check settings, normalize its, return 1 if was changed
// update settings at application if apply = 1 and parameters change isn't on-fly safe, update parameters always
#define _CAN_BE_CHANGED 11
#define _CAN_BE_CHANGED 11
unsigned long changes_array[2 * (_CAN_BE_CHANGED + 1)];
int changes_array_i = 2;
bool params_update = false;
......@@ -366,17 +364,21 @@ int Streamer::update_settings(bool apply) {
int Streamer::handler(RTSP_Server *rtsp_server, RTSP_Server::event event) {
static bool _play = false;
D(sensor_port, cerr << "event: running= " << running << " ");
D2(sensor_port, cerr << "event: running = " << running << endl);
switch (event) {
case RTSP_Server::DESCRIBE:
D2(sensor_port, cerr << "=== RTSP_Server::DESCRIBE ===" << endl);
// update frame size, fps before starting new stream (generating SDP file)
update_settings(true);
break;
case RTSP_Server::PARAMS_WAS_CHANGED:
D2(sensor_port, cerr << "=== RTSP_Server::PARAMS_WAS_CHANGED ===" << endl);
// update frame size, fps before starting new stream (generating SDP file)
return (update_settings(false) || !(params->daemon_enabled()));
case RTSP_Server::PLAY:
D(sensor_port, cerr << "==PLAY==");
D2(sensor_port, cerr << "=== RTSP_Server::PLAY ===" << endl);
if (connected_count == 0) {
int ttl = -1;
if (session->rtp_out.multicast)
......@@ -390,7 +392,7 @@ int Streamer::handler(RTSP_Server *rtsp_server, RTSP_Server::event event) {
running = true;
break;
case RTSP_Server::PAUSE:
D(sensor_port, cerr << "PAUSE");
D2(sensor_port, cerr << "=== RTSP_Server::PAUSE ===" << endl);
connected_count--;
if (connected_count <= 0) {
video->Stop();
......@@ -402,7 +404,7 @@ int Streamer::handler(RTSP_Server *rtsp_server, RTSP_Server::event event) {
}
break;
case RTSP_Server::TEARDOWN:
D(sensor_port, cerr << "TEARDOWN");
D2(sensor_port, cerr << "=== RTSP_Server::TEARDOWN ===" << endl);
if (!running) {
D(sensor_port, cerr << " was not running");
break;
......@@ -418,7 +420,7 @@ int Streamer::handler(RTSP_Server *rtsp_server, RTSP_Server::event event) {
}
break;
case RTSP_Server::RESET:
D(sensor_port, cerr << "RESET");
D2(sensor_port, cerr << "=== RTSP_Server::RESET ===" << endl);
if (!running) {
D(sensor_port, cerr << " was not running");
break;
......@@ -437,7 +439,7 @@ int Streamer::handler(RTSP_Server *rtsp_server, RTSP_Server::event event) {
break;
*/
default:
D(sensor_port, cerr << "unknown == " << event);
D2(sensor_port, cerr << "=== RTSP_Server::UNKNOWN ===" << endl);
break;
}
D(sensor_port, cerr << endl);
......@@ -445,7 +447,9 @@ int Streamer::handler(RTSP_Server *rtsp_server, RTSP_Server::event event) {
}
void Streamer::Main(void) {
D(sensor_port, cerr << "start Main for sensor port " << sensor_port << endl);
D(sensor_port, cerr << "Main() for sensor port " << sensor_port << endl);
int def_port = 20020 + 4 * sensor_port; // +4 because RTP port should be an even number and each stream can occupy 2 ports
string def_ttl = "2";
......@@ -456,19 +460,25 @@ void Streamer::Main(void) {
session->rtp_out.port_audio = def_port + 2;
session->rtp_out.ttl = def_ttl;
rtsp_server = NULL;
while (true) {
/// Check if the streamer is enabled, restart loop after waiting
if (!video->waitDaemonEnabled(-1)) {
sched_yield();
continue; /// may use particular bit instead of the "default" -1
}
// the very first settings update
update_settings(true);
/// Got here if is and was enabled (may use more actions instead of just "continue"
// start RTSP server
D2(sensor_port, cerr << "start server" << endl);
if (rtsp_server == NULL)
D2(sensor_port, cerr << "Starting RTSP server" << endl);
if (rtsp_server == NULL) {
rtsp_server = new RTSP_Server(Streamer::f_handler, (void *) this, params, session);
}
rtsp_server->main();
D2(sensor_port, cerr << "server was stopped" << endl);
D2(sensor_port, cerr << "stop video" << endl);
video->Stop();
......
......@@ -35,6 +35,9 @@
#include <iomanip>
#include "streamer.h"
#include "socket.h"
#include <stdio.h>
using namespace std;
......@@ -48,6 +51,10 @@ using namespace std;
#undef VIDEO_DEBUG // for FPS monitoring
#undef VIDEO_DEBUG_2 // for FPS monitoring
#define VIDEO_DEBUG
#define VIDEO_DEBUG_2
#define VIDEO_DEBUG_3
#ifdef VIDEO_DEBUG
#define D(s_port, a) \
do { \
......@@ -249,6 +256,7 @@ bool Video::isDaemonEnabled(int daemonBit) { // <0 - use default
* @return pointer (offset in circbuf) to the frame start
*/
long Video::getFramePars(struct interframe_params_t *frame_pars, long before, long ptr_before) {
long cur_pointer, p;
long this_pointer = 0;
......@@ -409,7 +417,7 @@ void Video::get_frame_pars(void *frame_pars, unsigned long offset)
if (offset >= METADATA_LEN) {
ptr = &buffer_ptr[BYTE2DW(offset - METADATA_LEN)];
memcpy(frame_pars, ptr, METADATA_LEN);
D3(sensor_port, cerr << "Read interframe params, ptr: " << (void *)ptr << endl);
//D3(sensor_port, cerr << "Read interframe params, ptr: " << (void *)ptr << endl);
} else {
// copy the chunk from the end of the buffer
remainder = METADATA_LEN - offset;
......@@ -425,8 +433,34 @@ void Video::get_frame_pars(void *frame_pars, unsigned long offset)
}
}
timeval debug_time0;
int debug_init_time(){
gettimeofday(&debug_time0, NULL);
return 0;
}
int debug_print_time(char * str){
timeval t0 = debug_time0;
timeval t1;
int d_sec, d_usec;
gettimeofday(&t1, NULL);
d_sec = t1.tv_sec - t0.tv_sec;
d_usec = t1.tv_usec - t0.tv_usec;
if (d_usec < 0) {
d_sec -= 1;
d_usec += 1000000;
}
printf("%s: %d.%06d\n",str,d_sec,d_usec);
return 0;
}
#define USE_REAL_OLD_TIMESTAMP 0
long Video::capture(void) {
long frame_len;
struct interframe_params_t frame_pars;
struct interframe_params_t curr_frame_params;
......@@ -442,13 +476,16 @@ long Video::capture(void) {
}
frameStartByteIndex = lseek(fd_circbuf, LSEEK_CIRC_TOWP, SEEK_END); // byte index in circbuf of the frame start
latestAvailableFrame_ptr = frameStartByteIndex;
lseek(fd_circbuf, LSEEK_CIRC_WAIT, SEEK_END);
frame_ptr = (char *) ((unsigned long) buffer_ptr + latestAvailableFrame_ptr);
frame_len = get_frame_len(latestAvailableFrame_ptr);
/*
D3(sensor_port, cerr << "Frame start byte index: " << frameStartByteIndex <<
", frame pointer: " << (void *)frame_ptr <<
", frame length: " << frame_len << endl);
*/
// read time stamp
unsigned char *ts_ptr = (unsigned char *) ((unsigned long) frame_ptr + (long) (((frame_len + CCAM_MMAP_META + 3) & (~0x1f)) + 32 - CCAM_MMAP_META_SEC));
......@@ -506,10 +543,16 @@ long Video::capture(void) {
}
f_quality = quality;
//debug_print_time("CAPTURE TOTAL TIME");
return frame_len;
}
timeval old_curTime;
timeval old_f_tv;
long Video::process(void) {
int _plen = 1400;
int to_send = _plen;
int _qtables_len = 128 + 4;
......@@ -548,15 +591,22 @@ long Video::process(void) {
uint32_t ts;
ts = timestamp;
ts = htonl(ts);
D(sensor_port, cerr << "This frame's time stamp: " << timestamp << endl);
//D(sensor_port, cerr << "This frame's time stamp: " << f_tv.tv_sec << "." << f_tv.tv_usec << ", calculated rtp ts = " << timestamp << endl);
long offset = 0;
struct iovec iov[4];
int vect_num;
bool first = true;
timeval curTime;
unsigned long pnum = 0;
bool last = false;
while (to_send_len && _play) {
unsigned long pnum = htons(packet_num);
bool last = false;
pnum = htons(packet_num);
last = false;
to_send = _plen;
if (qtables_include && first)
to_send = _plen - _qtables_len;
......@@ -574,6 +624,7 @@ long Video::process(void) {
h[1] = _ptype;
else
h[1] = 0x80 + _ptype;
memcpy((void *) &h[2], (void *) &pnum, 2);
memcpy((void *) &h[4], (void *) &ts, 4);
memcpy((void *) &h[8], (void *) &SSRC, 4);
......@@ -604,6 +655,7 @@ long Video::process(void) {
// send vector
vect_num = 0;
iov[vect_num].iov_base = h;
if (first) {
if (qtables_include) {
iov[vect_num++].iov_len = 24;
......@@ -613,13 +665,49 @@ long Video::process(void) {
iov[vect_num++].iov_len = 20;
}
first = false;
gettimeofday(&curTime, NULL);
//cout << "sys = [" << curTime.tv_sec << "." << curTime.tv_usec << "], img = [" << f_tv.tv_sec << "." << f_tv.tv_usec << "]"<< endl;
int dTime_sec = curTime.tv_sec - old_curTime.tv_sec;
int dTime_usec = curTime.tv_usec - old_curTime.tv_usec;
if (dTime_usec < 0) {
dTime_usec += 1000000;
dTime_sec -= 1;
}
int dTS_sec = f_tv.tv_sec - old_f_tv.tv_sec;
int dTS_usec = f_tv.tv_usec - old_f_tv.tv_usec;
if (dTS_usec < 0) {
dTS_usec += 1000000;
dTS_sec -= 1;
}
/*
printf("\nsys=[%u.%06d], img=[%u.%06d], deltaSys = %d.%06d, deltaImg = %d.%06d\n\n",
curTime.tv_sec, curTime.tv_usec,
f_tv.tv_sec, f_tv.tv_usec,
dTime_sec, dTime_usec,
dTS_sec, dTS_usec
);
*/
if (curTime.tv_usec != old_curTime.tv_usec){
old_curTime = curTime;
old_f_tv = f_tv;
}
} else {
iov[vect_num++].iov_len = 20;
}
if ((data + packet_len) <= buffer_ptr_end) {
iov[vect_num].iov_base = data;
iov[vect_num++].iov_len = packet_len;
data += packet_len;
} else {
// current packet rolls over the end of the buffer, split it and set data pointer to the buffer start
int overshoot = (data + packet_len) - (unsigned char *)(buffer_ptr + BYTE2DW(buffer_length));
......@@ -633,11 +721,28 @@ long Video::process(void) {
", packet_len_first: " << packet_len_first << endl);
data = (unsigned char *)buffer_ptr + overshoot;
}
rtp_socket->send_vect(iov, vect_num);
//rtp_socket->poll(&rtp_socket);
//cerr << "Polling out for " << rtp_socket->fd << endl;
//Socket::pollout(s,1000);
//debug_init_time();
if (!rtp_socket->send_vect(iov, vect_num)){
D3(sensor_port, cerr << "Error while sending, packets sent: " << packet_num << endl);
}
//usleep(1);
//debug_print_time("send_vect time: ");
packet_num++;
offset += packet_len;
//D3(sensor_port, cerr << "Packets sent so far: " << packet_num << endl);
//usleep(1);
}
D3(sensor_port, cerr << "Packets sent: " << packet_num << endl);
//D3(sensor_port, cerr << "Packets sent: " << packet_num << endl);
return 1;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment