streamer.cpp 14.3 KB
Newer Older
1
/**
2 3 4
 * @file srteamer.cpp
 * @brief Streamer implementation
 * @copyright Copyright (C) 2017 Elphel Inc.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
 * @author AUTHOR <EMAIL>
 *
 * @par License:
 *  This program is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

22 23 24 25 26 27 28 29 30 31 32 33
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "streamer.h"
#include "helpers.h"
#include "socket.h"

#include <iostream>

using namespace std;

34 35 36 37
//#undef RTSP_DEBUG
#define RTSP_DEBUG
//#undef RTSP_DEBUG_2
#define RTSP_DEBUG_2
38

Oleg Dzhimiev's avatar
Oleg Dzhimiev committed
39 40 41
#undef RTSP_DEBUG
#undef RTSP_DEBUG_2

42
#ifdef RTSP_DEBUG
43 44 45 46 47
	#define D(s_port, a) \
	do { \
		cerr << __FILE__ << ": " << __FUNCTION__ << ": " << __LINE__ << ": sensor port: " << s_port << " "; \
		a; \
	} while (0)
48
#else
49
	#define D(s_port, a)
50 51 52
#endif

#ifdef RTSP_DEBUG_2
53 54 55 56 57
	#define D2(s_port, a) \
	do { \
		cerr << __FILE__ << ": " << __FUNCTION__ << ": " << __LINE__ << ": sensor port: " << s_port << " "; \
		a; \
	} while (0)
58
#else
59
	#define D2(s_port, a)
60 61
#endif

62
Streamer::Streamer(const map<string, string> &_args, int port_num, bool audio_en) {
63 64
	sensor_port = port_num;
	_streamer = this;
65
	session = new Session();
66
	params = new Parameters(sensor_port);
67 68
	args = _args;
	audio = NULL;
69
	session->process_audio = audio_en;
70 71 72 73 74 75
	session->audio.sample_rate = 0;
	session->audio.channels = 0;
	session->rtp_out.ip_custom = false;
	session->rtp_out.ip_cached = 0;
	session->video.fps_scale = 1;
	audio_init();
76
	video = new Video(sensor_port, params);
77
	if (opt_present("f")) {
78 79
		float fps = 0;
		fps = atof(args["f"].c_str());
80
		if (fps < 0.1)
81
			fps = 0;
82
		D(sensor_port, cout << "use fps: " << fps << endl);
83 84 85 86
		video->fps(fps);
	}
	rtsp_server = NULL;
	connected_count = 0;
87 88 89 90 91 92 93

	// DEBUG FEATURE: self-enable audio processing, this should be done elsewhere, probably from camvc
	unsigned long snd_en = 0;
	if (session->process_audio)
		snd_en = 1;
	unsigned long params_array[2] = {P_STROP_AUDIO_EN, snd_en};
	params->setPValue(params_array, 2);
94 95 96
}

void Streamer::audio_init(void) {
97
	if (audio != NULL) {
98
		D(sensor_port, cerr << "delete audio" << endl);
99 100
		delete audio;
	}
101
	D(sensor_port, cout << "audio_enabled == " << session->process_audio << endl);
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
	if (session->process_audio) {
		string dev_name = "";
		map<string, string>::iterator args_it;
		if ((args_it = args.find("D")) != args.end())
			dev_name = args_it->second;
		audio = new Audio(sensor_port, session->process_audio, params, dev_name, session->audio.sample_rate, session->audio.channels);
		if (audio->present() && session->process_audio) {
			session->process_audio = true;
			session->audio.type = audio->ptype();
			session->audio.sample_rate = audio->sample_rate();
			session->audio.channels = audio->channels();
		} else {
			session->process_audio = false;
			session->audio.type = -1;
			session->audio.sample_rate = 0;
			session->audio.channels = 0;
		}
119 120 121 122 123 124
	}
}

Streamer::~Streamer(void) {
	delete video;
	delete audio;
125
	delete params;
126 127 128 129 130 131 132 133
}

int Streamer::f_handler(void *ptr, RTSP_Server *rtsp_server, RTSP_Server::event event) {
	Streamer *__this = (Streamer *)ptr;
	return __this->handler(rtsp_server, event);
}

int Streamer::update_settings(bool apply) {
134
	D(sensor_port, cerr << "update_settings" << endl);
135 136 137 138 139 140 141 142 143 144 145 146

	// check settings, normalize its, return 1 if was changed
	// update settings at application if apply = 1 and parameters change isn't on-fly safe, update parameters always
#define _CAN_BE_CHANGED	11
	unsigned long changes_array[2 * (_CAN_BE_CHANGED + 1)];
	int changes_array_i = 2;
	bool params_update = false;

	// update application settings
//	if(connected_count == 0) {

	// don't change "on the fly" if someone already connected - like mcast clients
147
//	Parameters *params = Parameters::instance();
148 149 150 151 152 153 154
	// multicast parameters
	// - multicast ip
	// new values
	bool result = false;
	//----------------
	// frame skip, or FPS scale
	int frames_skip = params->getGPValue(P_STROP_FRAMES_SKIP);
155 156
	if (frames_skip < 0 || frames_skip > 0xFFFF) {
		if (frames_skip < 0)
157
			frames_skip = 0;
158
		if (frames_skip < 0xFFFF)
159 160
			frames_skip = 0xFFFF;
		changes_array[changes_array_i + 0] = P_STROP_FRAMES_SKIP;
161
		changes_array[changes_array_i + 1] = (unsigned long) frames_skip;
162 163 164 165
		changes_array_i += 2;
		params_update = true;
	}
	frames_skip += 1; // convert to fps_scale format;
166 167
	if (frames_skip != session->video.fps_scale) {
		if (apply)
168 169 170
			session->video.fps_scale = frames_skip;
//cerr << "session->video.fps_scale = " << session->video.fps_scale << endl;
		result = true;
171
	}
172 173 174 175
	//----------------
	// transport parameters
	bool transport_was_changed = false;
	bool param_multicast = params->getGPValue(P_STROP_MCAST_EN);
176
	if (param_multicast || session->rtp_out.multicast) {
177
		// multicast/unicast
178 179
		if (param_multicast != session->rtp_out.multicast) {
			if (apply)
180 181 182 183 184 185 186
				session->rtp_out.multicast = param_multicast;
			transport_was_changed = true;
		}
		// IP
		unsigned long ip = params->getGPValue(P_STROP_MCAST_IP);
		bool ip_was_changed = false;
		// switch custom/default IP
187
		if ((ip == 0) && session->rtp_out.ip_custom)
188
			ip_was_changed = true;
189
		if ((ip != 0) && !session->rtp_out.ip_custom)
190 191
			ip_was_changed = true;
		// change of custom IP
192 193
		if ((ip != 0) && session->rtp_out.ip_custom)
			if (ip != session->rtp_out.ip_cached)
194
				ip_was_changed = true;
195 196
		if (ip_was_changed) {
			if (ip != 0) {
197 198 199
				struct in_addr a;
				uint32_t a_min = ntohl(inet_addr("224.0.0.0"));
				uint32_t a_max = ntohl(inet_addr("239.255.255.255"));
200
				if (a_min > a_max) {
201 202 203 204
					uint32_t a = a_min;
					a_min = a_max;
					a_max = a;
				}
205
				if (ip < a_min)
206
					ip = a_min;
207
				if (ip > a_max)
208 209
					ip = a_max;
				a.s_addr = htonl(ip);
210 211
				D(sensor_port, cerr << "multicast ip asked: " << inet_ntoa(a) << endl);
				if (apply) {
212 213 214 215 216 217 218 219 220
					session->rtp_out.ip_cached = ip;
					session->rtp_out.ip_custom = true;
					session->rtp_out.ip = inet_ntoa(a);
					changes_array[changes_array_i + 0] = P_STROP_MCAST_IP;
					changes_array[changes_array_i + 1] = ip;
					changes_array_i += 2;
				}
			} else {
				struct in_addr a = Socket::mcast_from_local();
221 222
				D(sensor_port, cerr << "multicast ip generated: " << inet_ntoa(a) << endl);
				if (apply) {
223 224 225 226 227 228
					session->rtp_out.ip_custom = false;
					session->rtp_out.ip = inet_ntoa(a);
				}
			}
			transport_was_changed = true;
		}
229 230
//D(		if(apply))
		D(sensor_port, if (apply) cerr << "actual multicast IP: " << session->rtp_out.ip << endl);
231 232
		// port
		int port = params->getGPValue(P_STROP_MCAST_PORT);
233 234
		if (port != session->rtp_out.port_video) {
			if (port < 1024)
235
				port = 1024;
236
			if (port > 65532)
237
				port = 65532;
238
			if (apply) {
239 240 241 242 243 244 245 246 247 248
				session->rtp_out.port_video = port;
				session->rtp_out.port_audio = port + 2;
				changes_array[changes_array_i + 0] = P_STROP_MCAST_PORT;
				changes_array[changes_array_i + 1] = session->rtp_out.port_video;
				changes_array_i += 2;
			}
			transport_was_changed = true;
		}
		// ttl
		int ttl = params->getGPValue(P_STROP_MCAST_TTL);
249 250
		if (ttl != atoi(session->rtp_out.ttl.c_str())) {
			if (ttl < 1)
251
				ttl = 1;
252
			if (ttl > 15)
253
				ttl = 15;
254
			if (apply) {
255 256 257 258 259 260 261 262 263 264
				char buf[8];
				sprintf(buf, "%d", ttl);
				session->rtp_out.ttl = buf;
				changes_array[changes_array_i + 0] = P_STROP_MCAST_TTL;
				changes_array[changes_array_i + 1] = ttl;
				changes_array_i += 2;
			}
			transport_was_changed = true;
		}
	}
265
	if (transport_was_changed)
266 267 268 269 270 271 272 273 274
		params_update = true;

	//-----------------
	// audio parameters
	bool audio_was_changed = false;
	bool audio_proc = true;
	bool f_audio_rate = false;
	bool f_audio_channels = false;
	// - enabled/disabled
275
	if (params->getGPValue(P_STROP_AUDIO_EN) == 0)
276 277 278 279
		audio_proc = false;
	int audio_rate = params->getGPValue(P_STROP_AUDIO_RATE);
	int audio_channels = params->getGPValue(P_STROP_AUDIO_CHANNEL);

280
	if (audio_proc != session->process_audio)
281
		audio_was_changed = true;
282
	if (audio_rate != session->audio.sample_rate)
283
		f_audio_rate = true;
284
	if (audio_channels != session->audio.channels)
285
		f_audio_channels = true;
286
	if ((audio_proc || session->process_audio) && (f_audio_rate || f_audio_channels))
287
		audio_was_changed = true;
288
	if (apply) {
289
		bool audio_restarted = false;
290
		if (audio_was_changed) {
291 292 293
			session->process_audio = audio_proc;
			session->audio.sample_rate = audio_rate;
			session->audio.channels = audio_channels;
294
			D2(sensor_port, cerr << "Audio was changed. Should restart it" << endl);
295 296 297
			audio_init();
			audio_restarted = true;
			// if audio enable was asked, check what soundcard really is connected
298 299
			if (audio_proc) {
				if (!audio->present()) {
300 301 302 303 304 305
					session->process_audio = false;
					changes_array[changes_array_i + 0] = P_STROP_AUDIO_EN;
					changes_array[changes_array_i + 1] = 0;
					changes_array_i += 2;
				}
			}
306
			if (f_audio_rate) {
307 308 309 310
				changes_array[changes_array_i + 0] = P_STROP_AUDIO_RATE;
				changes_array[changes_array_i + 1] = session->audio.sample_rate;
				changes_array_i += 2;
			}
311
			if (f_audio_channels) {
312 313 314 315 316 317
				changes_array[changes_array_i + 0] = P_STROP_AUDIO_CHANNEL;
				changes_array[changes_array_i + 1] = session->audio.channels;
				changes_array_i += 2;
			}
		}
		// was started before new client - must reinit audio
318
		if (!audio_restarted && session->process_audio)
319 320 321 322 323
			audio_init();
	}
	result = result || audio_was_changed || transport_was_changed;

	// apply volume if audio is enabled, and volume was changed
324 325 326

	if (session->process_audio) {
		if (audio->present()) {
327 328 329 330
			// check volume
			long volume = audio->volume();
			int audio_volume = params->getGPValue(P_AUDIO_CAPTURE_VOLUME);
			// and apply it
331
			if (audio_volume != volume) {
332 333 334 335 336 337 338 339 340 341 342
				audio->set_volume(audio_volume);
				changes_array[changes_array_i + 0] = P_AUDIO_CAPTURE_VOLUME;
				changes_array[changes_array_i + 1] = audio->volume();
				changes_array_i += 2;
				params_update = true;
			}
		}
	}

	// update array of changes
	// set frame to update
343
	if (apply || params_update) {
344 345 346 347 348 349 350
		changes_array[0] = FRAMEPARS_SETFRAME;
		changes_array[1] = params->getGPValue(G_THIS_FRAME) + 1;
		params->setPValue(changes_array, changes_array_i);
	}
	//------------------------------
	// update current image settings
//	if(apply) {
351 352 353 354 355 356 357 358 359
	// here - create new function from where update all settings
	struct video_desc_t video_desc = video->get_current_desc();
	if (video_desc.valid) {
		session->video.width = video_desc.width;
		session->video.height = video_desc.height;
		session->video.fps = video_desc.fps;
		session->video.fps /= session->video.fps_scale;
	}
	session->video.type = video->ptype();
360 361
//	}

362
	if (result)
363 364 365 366 367 368
		return 1;
	return 0;
}

int Streamer::handler(RTSP_Server *rtsp_server, RTSP_Server::event event) {
	static bool _play = false;
369 370
	D(sensor_port, cerr << "event: running= " << running << " ");
	switch (event) {
371 372
	case RTSP_Server::DESCRIBE:
		// update frame size, fps before starting new stream (generating SDP file)
373 374
		update_settings(true);
		break;
375 376
	case RTSP_Server::PARAMS_WAS_CHANGED:
		// update frame size, fps before starting new stream (generating SDP file)
377
		return (update_settings(false) || !(params->daemon_enabled()));
378
	case RTSP_Server::PLAY:
379 380
		D(sensor_port, cerr << "==PLAY==");
		if (connected_count == 0) {
381
			int ttl = -1;
382
			if (session->rtp_out.multicast)
383
				ttl = atoi(session->rtp_out.ttl.c_str());
384
			video->Start(session->rtp_out.ip, session->rtp_out.port_video, session->video.fps_scale, ttl);
385
			if (audio != NULL)
386 387 388 389 390 391 392
				audio->Start(session->rtp_out.ip, session->rtp_out.port_audio, ttl);
		}
		connected_count++;
		_play = true;
		running = true;
		break;
	case RTSP_Server::PAUSE:
393
		D(sensor_port, cerr << "PAUSE");
394
		connected_count--;
395
		if (connected_count <= 0) {
396
			video->Stop();
397
			if (audio != NULL)
398 399 400 401 402 403 404
				audio->Stop();
			connected_count = 0;
			_play = false;
			running = false;
		}
		break;
	case RTSP_Server::TEARDOWN:
405 406 407
		D(sensor_port, cerr << "TEARDOWN");
		if (!running) {
			D(sensor_port, cerr << " was not running");
408 409 410
			break;
		}
		connected_count--;
411
		if (connected_count <= 0) {
412
			video->Stop();
413
			if (audio != NULL)
414 415 416 417 418 419 420
				audio->Stop();
			connected_count = 0;
			_play = false;
			running = false;
		}
		break;
	case RTSP_Server::RESET:
421 422 423
		D(sensor_port, cerr << "RESET");
		if (!running) {
			D(sensor_port, cerr << " was not running");
424 425 426
			break;
		}
		video->Stop();
427
		if (audio != NULL)
428 429 430 431 432
			audio->Stop();
		connected_count = 0;
		_play = false;
		running = false;
		break;
433 434 435 436 437 438
		/*
		 case RTSP_Server::IS_DAEMON_ENABLED:
		 D(		cerr << "IS_DAEMON_ENABLED video->isDaemonEnabled(-1)=" << video->isDaemonEnabled(-1) << endl;)
		 return video->isDaemonEnabled(-1);
		 break;
		 */
439
	default:
440
		D(sensor_port, cerr << "unknown == " << event);
441 442
		break;
	}
443
	D(sensor_port, cerr << endl);
444 445 446 447
	return 0;
}

void Streamer::Main(void) {
448
	D(sensor_port, cerr << "start Main for sensor port " << sensor_port << endl);
449
	int def_port = 20020 + 4 * sensor_port;                     // +4 because RTP port should be an even number and each stream can occupy 2 ports
450 451 452 453 454 455 456 457 458
	string def_ttl = "2";

	session->rtp_out.ip_cached = 0;
	session->rtp_out.ip_custom = true;
	session->rtp_out.ip = "0";
	session->rtp_out.port_video = def_port;
	session->rtp_out.port_audio = def_port + 2;
	session->rtp_out.ttl = def_ttl;
	rtsp_server = NULL;
459
	while (true) {
460
		/// Check if the streamer is enabled, restart loop after waiting
461
		if (!video->waitDaemonEnabled(-1)) {
462 463 464 465 466 467
			sched_yield();
			continue; /// may use particular bit instead of the "default" -1
		}
		update_settings(true);
		/// Got here if is and was enabled (may use more actions instead of just "continue"
		// start RTSP server
468
		D2(sensor_port, cerr << "start server" << endl);
469 470
		if (rtsp_server == NULL)
			rtsp_server = new RTSP_Server(Streamer::f_handler, (void *) this, params, session);
471
		rtsp_server->main();
472 473
		D2(sensor_port, cerr << "server was stopped" << endl);
		D2(sensor_port, cerr << "stop video" << endl);
474
		video->Stop();
475
		D2(sensor_port, cerr << "stop audio" << endl);
476
		if (audio != NULL) {
477 478
			audio->Stop();
			// free audio resource - other app can use soundcard
479
			D2(sensor_port, cerr << "delete audio" << endl);
480 481 482 483 484
			delete audio;
			audio = NULL;
		}
	}
}
485 486 487 488 489 490 491

void *Streamer::pthread_f(void *_this)
{
	Streamer *__this = (Streamer *)_this;
	__this->Main();
	return NULL;
}