rtp_stream.cpp 6.72 KB
Newer Older
1
/**
2 3 4
 * @file rtp_stream.cpp
 * @brief Base class for RTP streams
 * @copyright Copyright (C) 2017 Elphel Inc.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
 * @author AUTHOR <EMAIL>
 *
 * @par License:
 *  This program is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

22 23
#include <arpa/inet.h>
#include <iostream>
24

25 26 27 28 29
#include "rtp_stream.h"
#include "helper.h"

using namespace std;

30
#define CNAME "elphel393"
31

32 33
//#undef RTP_DEBUG
#define RTP_DEBUG
34 35

#ifdef RTP_DEBUG
36 37 38 39 40
	#define D(s_port, a) \
	do { \
		cerr << __FILE__ << ": " << __FUNCTION__ << ": " << __LINE__ << ": sensor port: " << s_port << " "; \
		a; \
	} while (0)
41
#else
42
	#define D(s_port, a)
43 44 45 46 47 48 49 50 51 52
#endif

RTP_Stream::RTP_Stream(void) {
	_play = false;
	pthread_mutex_init(&pthm_flow, NULL);
	stream_name = "unknown";
	rtp_socket = NULL;
	rtcp_socket = NULL;
	sem_init(&sem_play, 0, 0);
	pth_id = -1;
53
	packet_num = 0;
54 55 56 57
}

RTP_Stream::~RTP_Stream() {
//cerr << "RTP_Stream::~RTP_Stream() for stream " << stream_name << endl;
58
	if (pth_id >= 0)
59
		pthread_cancel(pth);
60
	if (rtp_socket != NULL)
61
		delete rtp_socket;
62
	if (rtcp_socket != NULL)
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
		delete rtcp_socket;
	sem_destroy(&sem_play);
	pthread_mutex_destroy(&pthm_flow);
}

void RTP_Stream::init_pthread(void *__this) {
	pthread_attr_t tattr;
	pthread_attr_init(&tattr);
	pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
	pth_id = pthread_create(&pth, &tattr, RTP_Stream::pthread_f, (void *)__this);
}

void *RTP_Stream::pthread_f(void *_this) {
	RTP_Stream *__this = (RTP_Stream *)_this;
	return __this->thread();
}

void RTP_Stream::Start(string ip, int port, int ttl) {
81
	D(sensor_port, cerr << " new " << stream_name << " UDP socket at port: " << port << endl);
82
	pthread_mutex_lock(&pthm_flow);
83
	if (!_play) {
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
		rtp_socket = new Socket(ip, port, Socket::TYPE_UDP, ttl);
		rtcp_socket = new Socket(ip, port + 1, Socket::TYPE_UDP, ttl);
		rtp_packets = 0;
		rtp_octets = 0;
		rtcp_tv.tv_sec = 0;
		rtcp_tv.tv_usec = 0;
//		rtcp_delay = 2500000;	// in usec
//		rtcp_delay = 1250000;	// in usec
		rtcp_delay = 2000000;	// in usec
		_play = true;
		/// unlock semaphore - 'play' event
		sem_post(&sem_play);
	}
	pthread_mutex_unlock(&pthm_flow);
}

void RTP_Stream::Stop(void) {
101
	D(sensor_port, cerr << __FILE__<< ":"<< __FUNCTION__ << ":" <<__LINE__ <<endl);
102 103
//cerr << "RTP_Stream::Stop() for stream " << stream_name << " - begin" << endl;
	pthread_mutex_lock(&pthm_flow);
104
	if (_play) {
105 106 107 108 109
//cerr << "RTP_Stream::Stop() for stream " << stream_name << " - in progress" << endl;
		/// reset semaphore
		sem_init(&sem_play, 0, 0);
		_play = false;
//		delete rtcp_socket;
110
		if (rtp_socket != NULL) {
111 112 113
			delete rtp_socket;
			rtp_socket = NULL;
		}
114
		if (rtcp_socket != NULL) {
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
			delete rtcp_socket;
			rtcp_socket = NULL;
		}
	}
	pthread_mutex_unlock(&pthm_flow);
//cerr << "RTP_Stream::Stop() for stream " << stream_name << " - end" << endl;
}

/**
 * @brief Thread that invokes the video (and audio too) frame acquisition/ transmission
 *        In the current implementation video process() is blocking, but may be made
 *        non-blocking again later (through poll()). In addition to bool result
 *        (false if no frames are available, there are now long process(), with
 *        additional <0 result for the video frame change/stream shut down detected
 *        in Video class.
 * @return never
 */
void *RTP_Stream::thread(void) {
133 134
	D(sensor_port, cerr << "RTP_Stream::thread(void)" << endl);
	for (;;) {
135
		pthread_mutex_lock(&pthm_flow);
136
		if (_play) {
137
			long f = process();
138
			if (f > 0)
139 140 141
				rtcp();
			// process() and rtcp() use sockets
			pthread_mutex_unlock(&pthm_flow);
142 143
			if (f < 0) {
				D(sensor_port, cerr << __FILE__<< ":"<< __FUNCTION__ << ":" <<__LINE__<< "process exception detected: " << f << endl);
144 145 146 147 148 149 150 151 152 153 154 155 156
//				cerr << "Stop() from thread for stream " << stream_name << endl;
				Stop();
			}
		} else { /// wait for 'play' event semaphore
			pthread_mutex_unlock(&pthm_flow);
			sem_wait(&sem_play);
		}
	}
	return NULL;
}

void RTP_Stream::rtcp(void) {
	// check time for next one RTCP...
157
	if (f_tv.tv_sec == 0 && f_tv.tv_usec == 0)
158 159
		return;
	long td = time_delta_us(f_tv, rtcp_tv);
160
	if (td < 0) {
161 162 163
		rtcp_tv = f_tv;
		return;
	}
164
	if (td < rtcp_delay)
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
		return;
	rtcp_tv = f_tv;
	rtcp_send_sdes();
	rtcp_send_sr();
}

void RTP_Stream::rtcp_send_sr(void) {
	char packet[8 + 20]; // by RTP RFC 3550, for SR RTCP packet needed 8 + 20 bytes
	int packet_len = sizeof(packet);
	uint16_t us;
	uint32_t ul;

	// RTCP header
	packet[0] = 0x81;
	packet[1] = 200;	// SR
	us = htons(((packet_len) / 4) - 1);
181 182
	memcpy((void *) &packet[2], (void *) &us, 2);
	memcpy((void *) &packet[4], (void *) &SSRC, 4);
183 184
	// NTP timestamp is a fixed point 32.32 format time
	ul = htonl(f_tv.tv_sec);
185
	memcpy((void *) &packet[8], (void *) &ul, 4);
186 187 188 189
	double d = f_tv.tv_usec;
	d /= 1000000.0;
	d *= 65536.0;
	d *= 4096.0;
190 191
	uint32_t f = (uint32_t) d;
	if (f > 0x0FFFFFFF)
192 193 194
		f = 0x0FFFFFFF;
	f <<= 4;
	ul = htonl(f);
195
	memcpy((void *) &packet[12], (void *) &ul, 4);
196
	ul = htonl(timestamp);
197
	memcpy((void *) &packet[16], (void *) &ul, 4);
198
	ul = htonl(rtp_packets);
199
	memcpy((void *) &packet[20], (void *) &ul, 4);
200
	ul = htonl(rtp_octets);
201
	memcpy((void *) &packet[24], (void *) &ul, 4);
202
	rtcp_socket->send(packet, packet_len);
203 204
	D(sensor_port, cerr << "stream name: " << stream_name << ", f_tv: " << f_tv.tv_sec << ":" << f_tv.tv_usec
			<< ", timestamp: " << timestamp << endl);
205 206 207 208
}

void RTP_Stream::rtcp_send_sdes(void) {
	char packet[8 + 4 + 128]; // by RTP RFC 3550, for SDES RTCP packet needed 8 + 4 + ... bytes, 
209
	// so get additional 128 bytes for 126 CNAME field
210
	int packet_len = 0;
211
	int padding = 0;
212 213 214 215

	uint16_t us;
	const char *cname = CNAME;
	int cname_len = strlen(cname);
216
	bzero((void *) packet, 140); //8+4+128
217 218 219 220

	// RTCP header
	packet[0] = 0x81;
	packet[1] = 202;
221
	memcpy((void *) &packet[4], (void *) &SSRC, 4);
222 223 224 225
	packet_len += 8;

	// SDES fields
	packet[8] = 0x01;
226
	memcpy((void *) &packet[10], (void *) cname, cname_len);
227
	packet_len += 2; // + cname_len;
228 229 230 231 232 233 234
	// calculate common length SDES
	padding = (cname_len + 2) % 4;
	if (padding)
		cname_len += (4 - padding);
	packet[9] = cname_len;
	// each chunk  MUST be terminated by one or more null octets(RFC3350)
	packet_len += (cname_len + 4);
235 236

	us = htons((packet_len / 4) - 1);
237
	memcpy((void *) &packet[2], (void *) &us, 2);
238 239 240

	rtcp_socket->send(packet, packet_len);
}