streamer.cpp 14.3 KB
Newer Older
1
/**
2 3 4
 * @file srteamer.cpp
 * @brief Streamer implementation
 * @copyright Copyright (C) 2017 Elphel Inc.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
 * @author AUTHOR <EMAIL>
 *
 * @par License:
 *  This program is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

22 23 24 25 26 27 28 29 30 31 32 33
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "streamer.h"
#include "helpers.h"
#include "socket.h"

#include <iostream>

using namespace std;

34 35 36 37
//#undef RTSP_DEBUG
#define RTSP_DEBUG
//#undef RTSP_DEBUG_2
#define RTSP_DEBUG_2
38 39

#ifdef RTSP_DEBUG
40 41 42 43 44
	#define D(s_port, a) \
	do { \
		cerr << __FILE__ << ": " << __FUNCTION__ << ": " << __LINE__ << ": sensor port: " << s_port << " "; \
		a; \
	} while (0)
45
#else
46
	#define D(s_port, a)
47 48 49
#endif

#ifdef RTSP_DEBUG_2
50 51 52 53 54
	#define D2(s_port, a) \
	do { \
		cerr << __FILE__ << ": " << __FUNCTION__ << ": " << __LINE__ << ": sensor port: " << s_port << " "; \
		a; \
	} while (0)
55
#else
56
	#define D2(s_port, a)
57 58
#endif

59
Streamer::Streamer(const map<string, string> &_args, int port_num, bool audio_en) {
60 61
	sensor_port = port_num;
	_streamer = this;
62
	session = new Session();
63
	params = new Parameters(sensor_port);
64 65
	args = _args;
	audio = NULL;
66
	session->process_audio = audio_en;
67 68 69 70 71 72
	session->audio.sample_rate = 0;
	session->audio.channels = 0;
	session->rtp_out.ip_custom = false;
	session->rtp_out.ip_cached = 0;
	session->video.fps_scale = 1;
	audio_init();
73
	video = new Video(sensor_port, params);
74
	if (opt_present("f")) {
75 76
		float fps = 0;
		fps = atof(args["f"].c_str());
77
		if (fps < 0.1)
78
			fps = 0;
79
		D(sensor_port, cout << "use fps: " << fps << endl);
80 81 82 83
		video->fps(fps);
	}
	rtsp_server = NULL;
	connected_count = 0;
84 85 86 87 88 89 90

	// DEBUG FEATURE: self-enable audio processing, this should be done elsewhere, probably from camvc
	unsigned long snd_en = 0;
	if (session->process_audio)
		snd_en = 1;
	unsigned long params_array[2] = {P_STROP_AUDIO_EN, snd_en};
	params->setPValue(params_array, 2);
91 92 93
}

void Streamer::audio_init(void) {
94
	if (audio != NULL) {
95
		D(sensor_port, cerr << "delete audio" << endl);
96 97
		delete audio;
	}
98
	D(sensor_port, cout << "audio_enabled == " << session->process_audio << endl);
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
	if (session->process_audio) {
		string dev_name = "";
		map<string, string>::iterator args_it;
		if ((args_it = args.find("D")) != args.end())
			dev_name = args_it->second;
		audio = new Audio(sensor_port, session->process_audio, params, dev_name, session->audio.sample_rate, session->audio.channels);
		if (audio->present() && session->process_audio) {
			session->process_audio = true;
			session->audio.type = audio->ptype();
			session->audio.sample_rate = audio->sample_rate();
			session->audio.channels = audio->channels();
		} else {
			session->process_audio = false;
			session->audio.type = -1;
			session->audio.sample_rate = 0;
			session->audio.channels = 0;
		}
116 117 118 119 120 121
	}
}

Streamer::~Streamer(void) {
	delete video;
	delete audio;
122
	delete params;
123 124 125 126 127 128 129 130
}

int Streamer::f_handler(void *ptr, RTSP_Server *rtsp_server, RTSP_Server::event event) {
	Streamer *__this = (Streamer *)ptr;
	return __this->handler(rtsp_server, event);
}

int Streamer::update_settings(bool apply) {
131
	D(sensor_port, cerr << "update_settings" << endl);
132 133 134 135 136 137 138 139 140 141 142 143

	// check settings, normalize its, return 1 if was changed
	// update settings at application if apply = 1 and parameters change isn't on-fly safe, update parameters always
#define _CAN_BE_CHANGED	11
	unsigned long changes_array[2 * (_CAN_BE_CHANGED + 1)];
	int changes_array_i = 2;
	bool params_update = false;

	// update application settings
//	if(connected_count == 0) {

	// don't change "on the fly" if someone already connected - like mcast clients
144
//	Parameters *params = Parameters::instance();
145 146 147 148 149 150 151
	// multicast parameters
	// - multicast ip
	// new values
	bool result = false;
	//----------------
	// frame skip, or FPS scale
	int frames_skip = params->getGPValue(P_STROP_FRAMES_SKIP);
152 153
	if (frames_skip < 0 || frames_skip > 0xFFFF) {
		if (frames_skip < 0)
154
			frames_skip = 0;
155
		if (frames_skip < 0xFFFF)
156 157
			frames_skip = 0xFFFF;
		changes_array[changes_array_i + 0] = P_STROP_FRAMES_SKIP;
158
		changes_array[changes_array_i + 1] = (unsigned long) frames_skip;
159 160 161 162
		changes_array_i += 2;
		params_update = true;
	}
	frames_skip += 1; // convert to fps_scale format;
163 164
	if (frames_skip != session->video.fps_scale) {
		if (apply)
165 166 167
			session->video.fps_scale = frames_skip;
//cerr << "session->video.fps_scale = " << session->video.fps_scale << endl;
		result = true;
168
	}
169 170 171 172
	//----------------
	// transport parameters
	bool transport_was_changed = false;
	bool param_multicast = params->getGPValue(P_STROP_MCAST_EN);
173
	if (param_multicast || session->rtp_out.multicast) {
174
		// multicast/unicast
175 176
		if (param_multicast != session->rtp_out.multicast) {
			if (apply)
177 178 179 180 181 182 183
				session->rtp_out.multicast = param_multicast;
			transport_was_changed = true;
		}
		// IP
		unsigned long ip = params->getGPValue(P_STROP_MCAST_IP);
		bool ip_was_changed = false;
		// switch custom/default IP
184
		if ((ip == 0) && session->rtp_out.ip_custom)
185
			ip_was_changed = true;
186
		if ((ip != 0) && !session->rtp_out.ip_custom)
187 188
			ip_was_changed = true;
		// change of custom IP
189 190
		if ((ip != 0) && session->rtp_out.ip_custom)
			if (ip != session->rtp_out.ip_cached)
191
				ip_was_changed = true;
192 193
		if (ip_was_changed) {
			if (ip != 0) {
194 195 196
				struct in_addr a;
				uint32_t a_min = ntohl(inet_addr("224.0.0.0"));
				uint32_t a_max = ntohl(inet_addr("239.255.255.255"));
197
				if (a_min > a_max) {
198 199 200 201
					uint32_t a = a_min;
					a_min = a_max;
					a_max = a;
				}
202
				if (ip < a_min)
203
					ip = a_min;
204
				if (ip > a_max)
205 206
					ip = a_max;
				a.s_addr = htonl(ip);
207 208
				D(sensor_port, cerr << "multicast ip asked: " << inet_ntoa(a) << endl);
				if (apply) {
209 210 211 212 213 214 215 216 217
					session->rtp_out.ip_cached = ip;
					session->rtp_out.ip_custom = true;
					session->rtp_out.ip = inet_ntoa(a);
					changes_array[changes_array_i + 0] = P_STROP_MCAST_IP;
					changes_array[changes_array_i + 1] = ip;
					changes_array_i += 2;
				}
			} else {
				struct in_addr a = Socket::mcast_from_local();
218 219
				D(sensor_port, cerr << "multicast ip generated: " << inet_ntoa(a) << endl);
				if (apply) {
220 221 222 223 224 225
					session->rtp_out.ip_custom = false;
					session->rtp_out.ip = inet_ntoa(a);
				}
			}
			transport_was_changed = true;
		}
226 227
//D(		if(apply))
		D(sensor_port, if (apply) cerr << "actual multicast IP: " << session->rtp_out.ip << endl);
228 229
		// port
		int port = params->getGPValue(P_STROP_MCAST_PORT);
230 231
		if (port != session->rtp_out.port_video) {
			if (port < 1024)
232
				port = 1024;
233
			if (port > 65532)
234
				port = 65532;
235
			if (apply) {
236 237 238 239 240 241 242 243 244 245
				session->rtp_out.port_video = port;
				session->rtp_out.port_audio = port + 2;
				changes_array[changes_array_i + 0] = P_STROP_MCAST_PORT;
				changes_array[changes_array_i + 1] = session->rtp_out.port_video;
				changes_array_i += 2;
			}
			transport_was_changed = true;
		}
		// ttl
		int ttl = params->getGPValue(P_STROP_MCAST_TTL);
246 247
		if (ttl != atoi(session->rtp_out.ttl.c_str())) {
			if (ttl < 1)
248
				ttl = 1;
249
			if (ttl > 15)
250
				ttl = 15;
251
			if (apply) {
252 253 254 255 256 257 258 259 260 261
				char buf[8];
				sprintf(buf, "%d", ttl);
				session->rtp_out.ttl = buf;
				changes_array[changes_array_i + 0] = P_STROP_MCAST_TTL;
				changes_array[changes_array_i + 1] = ttl;
				changes_array_i += 2;
			}
			transport_was_changed = true;
		}
	}
262
	if (transport_was_changed)
263 264 265 266 267 268 269 270 271
		params_update = true;

	//-----------------
	// audio parameters
	bool audio_was_changed = false;
	bool audio_proc = true;
	bool f_audio_rate = false;
	bool f_audio_channels = false;
	// - enabled/disabled
272
	if (params->getGPValue(P_STROP_AUDIO_EN) == 0)
273 274 275 276
		audio_proc = false;
	int audio_rate = params->getGPValue(P_STROP_AUDIO_RATE);
	int audio_channels = params->getGPValue(P_STROP_AUDIO_CHANNEL);

277
	if (audio_proc != session->process_audio)
278
		audio_was_changed = true;
279
	if (audio_rate != session->audio.sample_rate)
280
		f_audio_rate = true;
281
	if (audio_channels != session->audio.channels)
282
		f_audio_channels = true;
283
	if ((audio_proc || session->process_audio) && (f_audio_rate || f_audio_channels))
284
		audio_was_changed = true;
285
	if (apply) {
286
		bool audio_restarted = false;
287
		if (audio_was_changed) {
288 289 290
			session->process_audio = audio_proc;
			session->audio.sample_rate = audio_rate;
			session->audio.channels = audio_channels;
291
			D2(sensor_port, cerr << "Audio was changed. Should restart it" << endl);
292 293 294
			audio_init();
			audio_restarted = true;
			// if audio enable was asked, check what soundcard really is connected
295 296
			if (audio_proc) {
				if (!audio->present()) {
297 298 299 300 301 302
					session->process_audio = false;
					changes_array[changes_array_i + 0] = P_STROP_AUDIO_EN;
					changes_array[changes_array_i + 1] = 0;
					changes_array_i += 2;
				}
			}
303
			if (f_audio_rate) {
304 305 306 307
				changes_array[changes_array_i + 0] = P_STROP_AUDIO_RATE;
				changes_array[changes_array_i + 1] = session->audio.sample_rate;
				changes_array_i += 2;
			}
308
			if (f_audio_channels) {
309 310 311 312 313 314
				changes_array[changes_array_i + 0] = P_STROP_AUDIO_CHANNEL;
				changes_array[changes_array_i + 1] = session->audio.channels;
				changes_array_i += 2;
			}
		}
		// was started before new client - must reinit audio
315
		if (!audio_restarted && session->process_audio)
316 317 318 319 320
			audio_init();
	}
	result = result || audio_was_changed || transport_was_changed;

	// apply volume if audio is enabled, and volume was changed
321 322 323

	if (session->process_audio) {
		if (audio->present()) {
324 325 326 327
			// check volume
			long volume = audio->volume();
			int audio_volume = params->getGPValue(P_AUDIO_CAPTURE_VOLUME);
			// and apply it
328
			if (audio_volume != volume) {
329 330 331 332 333 334 335 336 337 338 339
				audio->set_volume(audio_volume);
				changes_array[changes_array_i + 0] = P_AUDIO_CAPTURE_VOLUME;
				changes_array[changes_array_i + 1] = audio->volume();
				changes_array_i += 2;
				params_update = true;
			}
		}
	}

	// update array of changes
	// set frame to update
340
	if (apply || params_update) {
341 342 343 344 345 346 347
		changes_array[0] = FRAMEPARS_SETFRAME;
		changes_array[1] = params->getGPValue(G_THIS_FRAME) + 1;
		params->setPValue(changes_array, changes_array_i);
	}
	//------------------------------
	// update current image settings
//	if(apply) {
348 349 350 351 352 353 354 355 356
	// here - create new function from where update all settings
	struct video_desc_t video_desc = video->get_current_desc();
	if (video_desc.valid) {
		session->video.width = video_desc.width;
		session->video.height = video_desc.height;
		session->video.fps = video_desc.fps;
		session->video.fps /= session->video.fps_scale;
	}
	session->video.type = video->ptype();
357 358
//	}

359
	if (result)
360 361 362 363 364 365
		return 1;
	return 0;
}

int Streamer::handler(RTSP_Server *rtsp_server, RTSP_Server::event event) {
	static bool _play = false;
366 367
	D(sensor_port, cerr << "event: running= " << running << " ");
	switch (event) {
368 369
	case RTSP_Server::DESCRIBE:
		// update frame size, fps before starting new stream (generating SDP file)
370 371
		update_settings(true);
		break;
372 373
	case RTSP_Server::PARAMS_WAS_CHANGED:
		// update frame size, fps before starting new stream (generating SDP file)
374
		return (update_settings(false) || !(params->daemon_enabled()));
375
	case RTSP_Server::PLAY:
376 377
		D(sensor_port, cerr << "==PLAY==");
		if (connected_count == 0) {
378
			int ttl = -1;
379
			if (session->rtp_out.multicast)
380
				ttl = atoi(session->rtp_out.ttl.c_str());
381
			video->Start(session->rtp_out.ip, session->rtp_out.port_video, session->video.fps_scale, ttl);
382
			if (audio != NULL)
383 384 385 386 387 388 389
				audio->Start(session->rtp_out.ip, session->rtp_out.port_audio, ttl);
		}
		connected_count++;
		_play = true;
		running = true;
		break;
	case RTSP_Server::PAUSE:
390
		D(sensor_port, cerr << "PAUSE");
391
		connected_count--;
392
		if (connected_count <= 0) {
393
			video->Stop();
394
			if (audio != NULL)
395 396 397 398 399 400 401
				audio->Stop();
			connected_count = 0;
			_play = false;
			running = false;
		}
		break;
	case RTSP_Server::TEARDOWN:
402 403 404
		D(sensor_port, cerr << "TEARDOWN");
		if (!running) {
			D(sensor_port, cerr << " was not running");
405 406 407
			break;
		}
		connected_count--;
408
		if (connected_count <= 0) {
409
			video->Stop();
410
			if (audio != NULL)
411 412 413 414 415 416 417
				audio->Stop();
			connected_count = 0;
			_play = false;
			running = false;
		}
		break;
	case RTSP_Server::RESET:
418 419 420
		D(sensor_port, cerr << "RESET");
		if (!running) {
			D(sensor_port, cerr << " was not running");
421 422 423
			break;
		}
		video->Stop();
424
		if (audio != NULL)
425 426 427 428 429
			audio->Stop();
		connected_count = 0;
		_play = false;
		running = false;
		break;
430 431 432 433 434 435
		/*
		 case RTSP_Server::IS_DAEMON_ENABLED:
		 D(		cerr << "IS_DAEMON_ENABLED video->isDaemonEnabled(-1)=" << video->isDaemonEnabled(-1) << endl;)
		 return video->isDaemonEnabled(-1);
		 break;
		 */
436
	default:
437
		D(sensor_port, cerr << "unknown == " << event);
438 439
		break;
	}
440
	D(sensor_port, cerr << endl);
441 442 443 444
	return 0;
}

void Streamer::Main(void) {
445
	D(sensor_port, cerr << "start Main for sensor port " << sensor_port << endl);
446
	int def_port = 20020 + 4 * sensor_port;                     // +4 because RTP port should be an even number and each stream can occupy 2 ports
447 448 449 450 451 452 453 454 455
	string def_ttl = "2";

	session->rtp_out.ip_cached = 0;
	session->rtp_out.ip_custom = true;
	session->rtp_out.ip = "0";
	session->rtp_out.port_video = def_port;
	session->rtp_out.port_audio = def_port + 2;
	session->rtp_out.ttl = def_ttl;
	rtsp_server = NULL;
456
	while (true) {
457
		/// Check if the streamer is enabled, restart loop after waiting
458
		if (!video->waitDaemonEnabled(-1)) {
459 460 461 462 463 464
			sched_yield();
			continue; /// may use particular bit instead of the "default" -1
		}
		update_settings(true);
		/// Got here if is and was enabled (may use more actions instead of just "continue"
		// start RTSP server
465
		D2(sensor_port, cerr << "start server" << endl);
466 467
		if (rtsp_server == NULL)
			rtsp_server = new RTSP_Server(Streamer::f_handler, (void *) this, params, session);
468
		rtsp_server->main();
469 470
		D2(sensor_port, cerr << "server was stopped" << endl);
		D2(sensor_port, cerr << "stop video" << endl);
471
		video->Stop();
472
		D2(sensor_port, cerr << "stop audio" << endl);
473
		if (audio != NULL) {
474 475
			audio->Stop();
			// free audio resource - other app can use soundcard
476
			D2(sensor_port, cerr << "delete audio" << endl);
477 478 479 480 481
			delete audio;
			audio = NULL;
		}
	}
}
482 483 484 485 486 487 488

void *Streamer::pthread_f(void *_this)
{
	Streamer *__this = (Streamer *)_this;
	__this->Main();
	return NULL;
}