streamer.cpp 13.7 KB
Newer Older
1
/**
2 3 4
 * @file srteamer.cpp
 * @brief Streamer implementation
 * @copyright Copyright (C) 2017 Elphel Inc.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
 * @author AUTHOR <EMAIL>
 *
 * @par License:
 *  This program is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

22 23 24 25 26 27 28 29 30 31 32 33
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "streamer.h"
#include "helpers.h"
#include "socket.h"

#include <iostream>

using namespace std;

34 35 36 37
//#undef RTSP_DEBUG
#define RTSP_DEBUG
//#undef RTSP_DEBUG_2
#define RTSP_DEBUG_2
38 39

#ifdef RTSP_DEBUG
40 41 42 43 44
	#define D(s_port, a) \
	do { \
		cerr << __FILE__ << ": " << __FUNCTION__ << ": " << __LINE__ << ": sensor port: " << s_port << " "; \
		a; \
	} while (0)
45
#else
46
	#define D(s_port, a)
47 48 49
#endif

#ifdef RTSP_DEBUG_2
50 51 52 53 54
	#define D2(s_port, a) \
	do { \
		cerr << __FILE__ << ": " << __FUNCTION__ << ": " << __LINE__ << ": sensor port: " << s_port << " "; \
		a; \
	} while (0)
55
#else
56
	#define D2(s_port, a)
57 58
#endif

59
//Streamer *Streamer::_streamer = NULL;
60

61 62 63
Streamer::Streamer(const map<string, string> &_args, int port_num) {
	sensor_port = port_num;
	_streamer = this;
64
	session = new Session();
65
	params = new Parameters(sensor_port);
66 67 68 69 70 71 72 73 74
	args = _args;
	audio = NULL;
	session->process_audio = false;
	session->audio.sample_rate = 0;
	session->audio.channels = 0;
	session->rtp_out.ip_custom = false;
	session->rtp_out.ip_cached = 0;
	session->video.fps_scale = 1;
	audio_init();
75
	video = new Video(sensor_port, params);
76
	if (opt_present("f")) {
77 78
		float fps = 0;
		fps = atof(args["f"].c_str());
79
		if (fps < 0.1)
80
			fps = 0;
81
		D(sensor_port, cout << "use fps: " << fps << endl);
82 83 84 85 86 87 88
		video->fps(fps);
	}
	rtsp_server = NULL;
	connected_count = 0;
}

void Streamer::audio_init(void) {
89
	if (audio != NULL) {
90
		D(sensor_port, cerr << "delete audio" << endl);
91 92
		delete audio;
	}
93
	D(sensor_port, cout << "audio_enabled == " << session->process_audio << endl);
94 95
	audio = new Audio(session->process_audio, params, session->audio.sample_rate, session->audio.channels);
	if (audio->present() && session->process_audio) {
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
		session->process_audio = true;
		session->audio.type = audio->ptype();
		session->audio.sample_rate = audio->sample_rate();
		session->audio.channels = audio->channels();
	} else {
		session->process_audio = false;
		session->audio.type = -1;
		session->audio.sample_rate = 0;
		session->audio.channels = 0;
	}
}

Streamer::~Streamer(void) {
	delete video;
	delete audio;
111
	delete params;
112 113 114 115 116 117 118 119
}

int Streamer::f_handler(void *ptr, RTSP_Server *rtsp_server, RTSP_Server::event event) {
	Streamer *__this = (Streamer *)ptr;
	return __this->handler(rtsp_server, event);
}

int Streamer::update_settings(bool apply) {
120
	D(sensor_port, cerr << "update_settings" << endl);
121 122 123 124 125 126 127 128 129 130 131 132

	// check settings, normalize its, return 1 if was changed
	// update settings at application if apply = 1 and parameters change isn't on-fly safe, update parameters always
#define _CAN_BE_CHANGED	11
	unsigned long changes_array[2 * (_CAN_BE_CHANGED + 1)];
	int changes_array_i = 2;
	bool params_update = false;

	// update application settings
//	if(connected_count == 0) {

	// don't change "on the fly" if someone already connected - like mcast clients
133
//	Parameters *params = Parameters::instance();
134 135 136 137 138 139 140
	// multicast parameters
	// - multicast ip
	// new values
	bool result = false;
	//----------------
	// frame skip, or FPS scale
	int frames_skip = params->getGPValue(P_STROP_FRAMES_SKIP);
141 142
	if (frames_skip < 0 || frames_skip > 0xFFFF) {
		if (frames_skip < 0)
143
			frames_skip = 0;
144
		if (frames_skip < 0xFFFF)
145 146
			frames_skip = 0xFFFF;
		changes_array[changes_array_i + 0] = P_STROP_FRAMES_SKIP;
147
		changes_array[changes_array_i + 1] = (unsigned long) frames_skip;
148 149 150 151
		changes_array_i += 2;
		params_update = true;
	}
	frames_skip += 1; // convert to fps_scale format;
152 153
	if (frames_skip != session->video.fps_scale) {
		if (apply)
154 155 156
			session->video.fps_scale = frames_skip;
//cerr << "session->video.fps_scale = " << session->video.fps_scale << endl;
		result = true;
157
	}
158 159 160 161
	//----------------
	// transport parameters
	bool transport_was_changed = false;
	bool param_multicast = params->getGPValue(P_STROP_MCAST_EN);
162
	if (param_multicast || session->rtp_out.multicast) {
163
		// multicast/unicast
164 165
		if (param_multicast != session->rtp_out.multicast) {
			if (apply)
166 167 168 169 170 171 172
				session->rtp_out.multicast = param_multicast;
			transport_was_changed = true;
		}
		// IP
		unsigned long ip = params->getGPValue(P_STROP_MCAST_IP);
		bool ip_was_changed = false;
		// switch custom/default IP
173
		if ((ip == 0) && session->rtp_out.ip_custom)
174
			ip_was_changed = true;
175
		if ((ip != 0) && !session->rtp_out.ip_custom)
176 177
			ip_was_changed = true;
		// change of custom IP
178 179
		if ((ip != 0) && session->rtp_out.ip_custom)
			if (ip != session->rtp_out.ip_cached)
180
				ip_was_changed = true;
181 182
		if (ip_was_changed) {
			if (ip != 0) {
183 184 185
				struct in_addr a;
				uint32_t a_min = ntohl(inet_addr("224.0.0.0"));
				uint32_t a_max = ntohl(inet_addr("239.255.255.255"));
186
				if (a_min > a_max) {
187 188 189 190
					uint32_t a = a_min;
					a_min = a_max;
					a_max = a;
				}
191
				if (ip < a_min)
192
					ip = a_min;
193
				if (ip > a_max)
194 195
					ip = a_max;
				a.s_addr = htonl(ip);
196 197
				D(sensor_port, cerr << "multicast ip asked: " << inet_ntoa(a) << endl);
				if (apply) {
198 199 200 201 202 203 204 205 206
					session->rtp_out.ip_cached = ip;
					session->rtp_out.ip_custom = true;
					session->rtp_out.ip = inet_ntoa(a);
					changes_array[changes_array_i + 0] = P_STROP_MCAST_IP;
					changes_array[changes_array_i + 1] = ip;
					changes_array_i += 2;
				}
			} else {
				struct in_addr a = Socket::mcast_from_local();
207 208
				D(sensor_port, cerr << "multicast ip generated: " << inet_ntoa(a) << endl);
				if (apply) {
209 210 211 212 213 214
					session->rtp_out.ip_custom = false;
					session->rtp_out.ip = inet_ntoa(a);
				}
			}
			transport_was_changed = true;
		}
215 216
//D(		if(apply))
		D(sensor_port, if (apply) cerr << "actual multicast IP: " << session->rtp_out.ip << endl);
217 218
		// port
		int port = params->getGPValue(P_STROP_MCAST_PORT);
219 220
		if (port != session->rtp_out.port_video) {
			if (port < 1024)
221
				port = 1024;
222
			if (port > 65532)
223
				port = 65532;
224
			if (apply) {
225 226 227 228 229 230 231 232 233 234
				session->rtp_out.port_video = port;
				session->rtp_out.port_audio = port + 2;
				changes_array[changes_array_i + 0] = P_STROP_MCAST_PORT;
				changes_array[changes_array_i + 1] = session->rtp_out.port_video;
				changes_array_i += 2;
			}
			transport_was_changed = true;
		}
		// ttl
		int ttl = params->getGPValue(P_STROP_MCAST_TTL);
235 236
		if (ttl != atoi(session->rtp_out.ttl.c_str())) {
			if (ttl < 1)
237
				ttl = 1;
238
			if (ttl > 15)
239
				ttl = 15;
240
			if (apply) {
241 242 243 244 245 246 247 248 249 250
				char buf[8];
				sprintf(buf, "%d", ttl);
				session->rtp_out.ttl = buf;
				changes_array[changes_array_i + 0] = P_STROP_MCAST_TTL;
				changes_array[changes_array_i + 1] = ttl;
				changes_array_i += 2;
			}
			transport_was_changed = true;
		}
	}
251
	if (transport_was_changed)
252 253 254 255 256 257 258 259 260
		params_update = true;

	//-----------------
	// audio parameters
	bool audio_was_changed = false;
	bool audio_proc = true;
	bool f_audio_rate = false;
	bool f_audio_channels = false;
	// - enabled/disabled
261
	if (params->getGPValue(P_STROP_AUDIO_EN) == 0)
262 263 264 265
		audio_proc = false;
	int audio_rate = params->getGPValue(P_STROP_AUDIO_RATE);
	int audio_channels = params->getGPValue(P_STROP_AUDIO_CHANNEL);

266
	if (audio_proc != session->process_audio)
267
		audio_was_changed = true;
268
	if (audio_rate != session->audio.sample_rate)
269
		f_audio_rate = true;
270
	if (audio_channels != session->audio.channels)
271
		f_audio_channels = true;
272
	if ((audio_proc || session->process_audio) && (f_audio_rate || f_audio_channels))
273
		audio_was_changed = true;
274
	if (apply) {
275
		bool audio_restarted = false;
276
		if (audio_was_changed) {
277 278 279
			session->process_audio = audio_proc;
			session->audio.sample_rate = audio_rate;
			session->audio.channels = audio_channels;
280
			D2(sensor_port, cerr << "Audio was changed. Should restart it" << endl);
281 282 283
			audio_init();
			audio_restarted = true;
			// if audio enable was asked, check what soundcard really is connected
284 285
			if (audio_proc) {
				if (!audio->present()) {
286 287 288 289 290 291
					session->process_audio = false;
					changes_array[changes_array_i + 0] = P_STROP_AUDIO_EN;
					changes_array[changes_array_i + 1] = 0;
					changes_array_i += 2;
				}
			}
292
			if (f_audio_rate) {
293 294 295 296
				changes_array[changes_array_i + 0] = P_STROP_AUDIO_RATE;
				changes_array[changes_array_i + 1] = session->audio.sample_rate;
				changes_array_i += 2;
			}
297
			if (f_audio_channels) {
298 299 300 301 302 303
				changes_array[changes_array_i + 0] = P_STROP_AUDIO_CHANNEL;
				changes_array[changes_array_i + 1] = session->audio.channels;
				changes_array_i += 2;
			}
		}
		// was started before new client - must reinit audio
304
		if (!audio_restarted && session->process_audio)
305 306 307 308 309
			audio_init();
	}
	result = result || audio_was_changed || transport_was_changed;

	// apply volume if audio is enabled, and volume was changed
310 311 312

	if (session->process_audio) {
		if (audio->present()) {
313 314 315 316
			// check volume
			long volume = audio->volume();
			int audio_volume = params->getGPValue(P_AUDIO_CAPTURE_VOLUME);
			// and apply it
317
			if (audio_volume != volume) {
318 319 320 321 322 323 324 325 326 327 328
				audio->set_volume(audio_volume);
				changes_array[changes_array_i + 0] = P_AUDIO_CAPTURE_VOLUME;
				changes_array[changes_array_i + 1] = audio->volume();
				changes_array_i += 2;
				params_update = true;
			}
		}
	}

	// update array of changes
	// set frame to update
329
	if (apply || params_update) {
330 331 332 333 334 335 336
		changes_array[0] = FRAMEPARS_SETFRAME;
		changes_array[1] = params->getGPValue(G_THIS_FRAME) + 1;
		params->setPValue(changes_array, changes_array_i);
	}
	//------------------------------
	// update current image settings
//	if(apply) {
337 338 339 340 341 342 343 344 345
	// here - create new function from where update all settings
	struct video_desc_t video_desc = video->get_current_desc();
	if (video_desc.valid) {
		session->video.width = video_desc.width;
		session->video.height = video_desc.height;
		session->video.fps = video_desc.fps;
		session->video.fps /= session->video.fps_scale;
	}
	session->video.type = video->ptype();
346 347
//	}

348
	if (result)
349 350 351 352 353 354
		return 1;
	return 0;
}

int Streamer::handler(RTSP_Server *rtsp_server, RTSP_Server::event event) {
	static bool _play = false;
355 356 357
	D(sensor_port, cerr << "event: running= " << running << " ");
	switch (event) {
	case RTSP_Server::DESCRIBE: /// Update frame size, fps before starting new stream (generating SDP file)
358 359
		update_settings(true);
		break;
360
	case RTSP_Server::PARAMS_WAS_CHANGED: /// Update frame size, fps before starting new stream (generating SDP file)
361
		return (update_settings(false) || !(params->daemon_enabled()));
362
	case RTSP_Server::PLAY:
363 364
		D(sensor_port, cerr << "==PLAY==");
		if (connected_count == 0) {
365
			int ttl = -1;
366
			if (session->rtp_out.multicast)
367
				ttl = atoi(session->rtp_out.ttl.c_str());
368 369 370
			video->Start(session->rtp_out.ip, session->rtp_out.port_video, session->video.fps_scale,
					ttl);
			if (audio != NULL)
371 372 373 374 375 376 377
				audio->Start(session->rtp_out.ip, session->rtp_out.port_audio, ttl);
		}
		connected_count++;
		_play = true;
		running = true;
		break;
	case RTSP_Server::PAUSE:
378
		D(sensor_port, cerr << "PAUSE");
379
		connected_count--;
380
		if (connected_count <= 0) {
381
			video->Stop();
382
			if (audio != NULL)
383 384 385 386 387 388 389
				audio->Stop();
			connected_count = 0;
			_play = false;
			running = false;
		}
		break;
	case RTSP_Server::TEARDOWN:
390 391 392
		D(sensor_port, cerr << "TEARDOWN");
		if (!running) {
			D(sensor_port, cerr << " was not running");
393 394 395
			break;
		}
		connected_count--;
396
		if (connected_count <= 0) {
397
			video->Stop();
398
			if (audio != NULL)
399 400 401 402 403 404 405
				audio->Stop();
			connected_count = 0;
			_play = false;
			running = false;
		}
		break;
	case RTSP_Server::RESET:
406 407 408
		D(sensor_port, cerr << "RESET");
		if (!running) {
			D(sensor_port, cerr << " was not running");
409 410 411
			break;
		}
		video->Stop();
412
		if (audio != NULL)
413 414 415 416 417
			audio->Stop();
		connected_count = 0;
		_play = false;
		running = false;
		break;
418 419 420 421 422 423
		/*
		 case RTSP_Server::IS_DAEMON_ENABLED:
		 D(		cerr << "IS_DAEMON_ENABLED video->isDaemonEnabled(-1)=" << video->isDaemonEnabled(-1) << endl;)
		 return video->isDaemonEnabled(-1);
		 break;
		 */
424
	default:
425
		D(sensor_port, cerr << "unknown == " << event);
426 427
		break;
	}
428
	D(sensor_port, cerr << endl);
429 430 431 432
	return 0;
}

void Streamer::Main(void) {
433
	D(sensor_port, cerr << "start Main for sensor port " << sensor_port << endl);
434 435 436 437 438 439 440 441 442 443
	int def_port = 20020;
	string def_ttl = "2";

	session->rtp_out.ip_cached = 0;
	session->rtp_out.ip_custom = true;
	session->rtp_out.ip = "0";
	session->rtp_out.port_video = def_port;
	session->rtp_out.port_audio = def_port + 2;
	session->rtp_out.ttl = def_ttl;
	rtsp_server = NULL;
444
	while (true) {
445
		/// Check if the streamer is enabled, restart loop after waiting
446
		if (!video->waitDaemonEnabled(-1)) {
447 448 449 450 451 452
			sched_yield();
			continue; /// may use particular bit instead of the "default" -1
		}
		update_settings(true);
		/// Got here if is and was enabled (may use more actions instead of just "continue"
		// start RTSP server
453
		D2(sensor_port, cerr << "start server" << endl);
454 455
		if (rtsp_server == NULL)
			rtsp_server = new RTSP_Server(Streamer::f_handler, (void *) this, params, session);
456
		rtsp_server->main();
457 458
		D2(sensor_port, cerr << "server was stopped" << endl);
		D2(sensor_port, cerr << "stop video" << endl);
459
		video->Stop();
460
		D2(sensor_port, cerr << "stop audio" << endl);
461
		if (audio != NULL) {
462 463
			audio->Stop();
			// free audio resource - other app can use soundcard
464
			D2(sensor_port, cerr << "delete audio" << endl);
465 466 467 468 469
			delete audio;
			audio = NULL;
		}
	}
}
470 471 472 473 474 475 476

void *Streamer::pthread_f(void *_this)
{
	Streamer *__this = (Streamer *)_this;
	__this->Main();
	return NULL;
}