1/*
2 * Copyright (c) 2002, 2003 Marcus Overhagen <Marcus@Overhagen.de>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining
5 * a copy of this software and associated documentation files or portions
6 * thereof (the "Software"), to deal in the Software without restriction,
7 * including without limitation the rights to use, copy, modify, merge,
8 * publish, distribute, sublicense, and/or sell copies of the Software,
9 * and to permit persons to whom the Software is furnished to do so, subject
10 * to the following conditions:
11 *
12 *  * Redistributions of source code must retain the above copyright notice,
13 *    this list of conditions and the following disclaimer.
14 *
15 *  * Redistributions in binary form must reproduce the above copyright notice
16 *    in the  binary, as well as this list of conditions and the following
17 *    disclaimer in the documentation and/or other materials provided with
18 *    the distribution.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 */
28
29
30#include "BufferCache.h"
31#include <BufferConsumer.h>
32
33#include <stdlib.h>
34#include <string.h>
35
36#include <AutoDeleter.h>
37#include <BufferProducer.h>
38#include <BufferGroup.h>
39#include <Buffer.h>
40#include <TimeSource.h>
41
42#include <MediaDebug.h>
43#include <MediaMisc.h>
44#include <DataExchange.h>
45
46
47
48BBufferConsumer::~BBufferConsumer()
49{
50	CALLED();
51	delete fBufferCache;
52	delete fDeleteBufferGroup;
53}
54
55
56// #pragma mark - public BBufferConsumer
57
58
59media_type
60BBufferConsumer::ConsumerType()
61{
62	CALLED();
63	return fConsumerType;
64}
65
66
67/*static*/ status_t
68BBufferConsumer::RegionToClipData(const BRegion* region, int32* _format,
69	int32 *_size, void* data)
70{
71	CALLED();
72
73	int count = *_size / sizeof(int16);
74	status_t status = BBufferProducer::clip_region_to_shorts(region,
75		static_cast<int16 *>(data), count, &count);
76
77	*_size = count * sizeof(int16);
78	*_format = BBufferProducer::B_CLIP_SHORT_RUNS;
79
80	return status;
81}
82
83
84// #pragma mark - protected BBufferConsumer
85
86
87BBufferConsumer::BBufferConsumer(media_type consumerType)
88	:
89	BMediaNode("called by BBufferConsumer"),
90	fConsumerType(consumerType),
91	fBufferCache(new BPrivate::BufferCache),
92	fDeleteBufferGroup(0)
93{
94	CALLED();
95
96	AddNodeKind(B_BUFFER_CONSUMER);
97}
98
99
100/*static*/ void
101BBufferConsumer::NotifyLateProducer(const media_source& whatSource,
102	bigtime_t howMuch, bigtime_t performanceTime)
103{
104	CALLED();
105	if (IS_INVALID_SOURCE(whatSource))
106		return;
107
108	producer_late_notice_received_command command;
109	command.source = whatSource;
110	command.how_much = howMuch;
111	command.performance_time = performanceTime;
112
113	SendToPort(whatSource.port, PRODUCER_LATE_NOTICE_RECEIVED, &command,
114		sizeof(command));
115}
116
117
118status_t
119BBufferConsumer::SetVideoClippingFor(const media_source& output,
120	const media_destination& destination, const int16* shorts, int32 shortCount,
121	const media_video_display_info& display, void* userData, int32* _changeTag,
122	void *_reserved_)
123{
124	CALLED();
125	if (IS_INVALID_SOURCE(output))
126		return B_MEDIA_BAD_SOURCE;
127	if (IS_INVALID_DESTINATION(destination))
128		return B_MEDIA_BAD_DESTINATION;
129	if (shortCount > int(B_MEDIA_MESSAGE_SIZE
130			- sizeof(producer_video_clipping_changed_command)) / 2) {
131		debugger("BBufferConsumer::SetVideoClippingFor short_count too large "
132			"(8000 limit)\n");
133	}
134
135	producer_video_clipping_changed_command* command;
136	size_t size = sizeof(producer_video_clipping_changed_command)
137		+ shortCount * sizeof(short);
138	command
139		= static_cast<producer_video_clipping_changed_command*>(malloc(size));
140	if (command == NULL)
141		return B_NO_MEMORY;
142
143	command->source = output;
144	command->destination = destination;
145	command->display = display;
146	command->user_data = userData;
147	command->change_tag = NewChangeTag();
148	command->short_count = shortCount;
149	memcpy(command->shorts, shorts, shortCount * sizeof(short));
150	if (_changeTag != NULL)
151		*_changeTag = command->change_tag;
152
153	status_t status = SendToPort(output.port, PRODUCER_VIDEO_CLIPPING_CHANGED,
154		command, size);
155
156	free(command);
157	return status;
158}
159
160
161status_t
162BBufferConsumer::SetOutputEnabled(const media_source &source,
163								  const media_destination &destination,
164								  bool enabled,
165								  void *user_data,
166								  int32 *change_tag,
167								  void *_reserved_)
168{
169	CALLED();
170	if (IS_INVALID_SOURCE(source))
171		return B_MEDIA_BAD_SOURCE;
172	if (IS_INVALID_DESTINATION(destination))
173		return B_MEDIA_BAD_DESTINATION;
174
175	producer_enable_output_command command;
176
177	command.source = source;
178	command.destination = destination;
179	command.enabled = enabled;
180	command.user_data = user_data;
181	command.change_tag = NewChangeTag();
182	if (change_tag != NULL)
183		*change_tag = command.change_tag;
184
185	return SendToPort(source.port, PRODUCER_ENABLE_OUTPUT, &command, sizeof(command));
186}
187
188
189status_t
190BBufferConsumer::RequestFormatChange(const media_source &source,
191									 const media_destination &destination,
192									 const media_format &to_format,
193									 void *user_data,
194									 int32 *change_tag,
195									 void *_reserved_)
196{
197	CALLED();
198	if (IS_INVALID_SOURCE(source))
199		return B_MEDIA_BAD_SOURCE;
200	if (IS_INVALID_DESTINATION(destination))
201		return B_MEDIA_BAD_DESTINATION;
202
203	producer_format_change_requested_command command;
204
205	command.source = source;
206	command.destination = destination;
207	command.format = to_format;
208	command.user_data = user_data;
209	command.change_tag = NewChangeTag();
210	if (change_tag != NULL)
211		*change_tag = command.change_tag;
212
213	return SendToPort(source.port, PRODUCER_FORMAT_CHANGE_REQUESTED, &command, sizeof(command));
214}
215
216
217status_t
218BBufferConsumer::RequestAdditionalBuffer(const media_source &source,
219										 BBuffer *prev_buffer,
220										 void *_reserved)
221{
222	CALLED();
223	if (IS_INVALID_SOURCE(source))
224		return B_MEDIA_BAD_SOURCE;
225
226	producer_additional_buffer_requested_command command;
227
228	command.source = source;
229	command.prev_buffer = prev_buffer->ID();
230	command.prev_time = 0;
231	command.has_seek_tag = false;
232	//command.prev_tag =
233
234	return SendToPort(source.port, PRODUCER_ADDITIONAL_BUFFER_REQUESTED, &command, sizeof(command));
235}
236
237
238status_t
239BBufferConsumer::RequestAdditionalBuffer(const media_source& source,
240	bigtime_t startTime, void *_reserved)
241{
242	CALLED();
243	if (IS_INVALID_SOURCE(source))
244		return B_MEDIA_BAD_SOURCE;
245
246	producer_additional_buffer_requested_command command;
247
248	command.source = source;
249	command.prev_buffer = 0;
250	command.prev_time = startTime;
251	command.has_seek_tag = false;
252
253	return SendToPort(source.port, PRODUCER_ADDITIONAL_BUFFER_REQUESTED,
254		&command, sizeof(command));
255}
256
257
258status_t
259BBufferConsumer::SetOutputBuffersFor(const media_source &source,
260	const media_destination &destination, BBufferGroup *group, void *user_data,
261	int32 *change_tag, bool will_reclaim, void *_reserved_)
262{
263	CALLED();
264
265	if (IS_INVALID_SOURCE(source))
266		return B_MEDIA_BAD_SOURCE;
267	if (IS_INVALID_DESTINATION(destination))
268		return B_MEDIA_BAD_DESTINATION;
269
270	int32 buffer_count = 0;
271
272	if (group != NULL) {
273		if (group->CountBuffers(&buffer_count) != B_OK)
274			return B_ERROR;
275	}
276
277	size_t size = sizeof(producer_set_buffer_group_command)
278		+ buffer_count * sizeof(media_buffer_id);
279
280	producer_set_buffer_group_command *command
281		= static_cast<producer_set_buffer_group_command *>(malloc(size));
282	MemoryDeleter deleter(command);
283
284	command->source = source;
285	command->destination = destination;
286	command->user_data = user_data;
287	command->change_tag = NewChangeTag();
288
289	BBuffer *buffers[buffer_count];
290	if (buffer_count != 0) {
291		if (group->GetBufferList(buffer_count, buffers) != B_OK)
292			return B_ERROR;
293		for (int32 i = 0; i < buffer_count; i++)
294			command->buffers[i] = buffers[i]->ID();
295	}
296
297	command->buffer_count = buffer_count;
298
299	if (change_tag != NULL)
300		*change_tag = command->change_tag;
301
302	status_t status = SendToPort(source.port, PRODUCER_SET_BUFFER_GROUP,
303		command, size);
304
305	if (status == B_OK) {
306		// XXX will leak memory if port write failed
307		delete fDeleteBufferGroup;
308		fDeleteBufferGroup = will_reclaim ? NULL : group;
309	}
310	return status;
311}
312
313
314status_t
315BBufferConsumer::SendLatencyChange(const media_source& source,
316	const media_destination& destination, bigtime_t newLatency, uint32 flags)
317{
318	CALLED();
319	if (IS_INVALID_SOURCE(source))
320		return B_MEDIA_BAD_SOURCE;
321	if (IS_INVALID_DESTINATION(destination))
322		return B_MEDIA_BAD_DESTINATION;
323
324	producer_latency_changed_command command;
325
326	command.source = source;
327	command.destination = destination;
328	command.latency = newLatency;
329	command.flags = flags;
330
331	TRACE("###### BBufferConsumer::SendLatencyChange: latency from %" B_PRId32 "/%" B_PRId32 " to "
332		"%" B_PRId32 "/%" B_PRId32 " changed to %" B_PRId64 "\n", source.port, source.id,
333		destination.port, destination.id, newLatency);
334
335	return SendToPort(source.port, PRODUCER_LATENCY_CHANGED, &command,
336		sizeof(command));
337}
338
339
340status_t
341BBufferConsumer::HandleMessage(int32 message, const void* data, size_t size)
342{
343	PRINT(4, "BBufferConsumer::HandleMessage %#lx, node %ld\n", message, ID());
344	status_t rv;
345	switch (message) {
346		case CONSUMER_ACCEPT_FORMAT:
347		{
348			const consumer_accept_format_request* request
349				= static_cast<const consumer_accept_format_request*>(data);
350
351			consumer_accept_format_reply reply;
352			reply.format = request->format;
353			status_t status = AcceptFormat(request->dest, &reply.format);
354			request->SendReply(status, &reply, sizeof(reply));
355			return B_OK;
356		}
357
358		case CONSUMER_GET_NEXT_INPUT:
359		{
360			const consumer_get_next_input_request *request = static_cast<const consumer_get_next_input_request *>(data);
361			consumer_get_next_input_reply reply;
362			reply.cookie = request->cookie;
363			rv = GetNextInput(&reply.cookie, &reply.input);
364			request->SendReply(rv, &reply, sizeof(reply));
365			return B_OK;
366		}
367
368		case CONSUMER_DISPOSE_INPUT_COOKIE:
369		{
370			const consumer_dispose_input_cookie_request *request = static_cast<const consumer_dispose_input_cookie_request *>(data);
371			consumer_dispose_input_cookie_reply reply;
372			DisposeInputCookie(request->cookie);
373			request->SendReply(B_OK, &reply, sizeof(reply));
374			return B_OK;
375		}
376
377		case CONSUMER_BUFFER_RECEIVED:
378		{
379			const consumer_buffer_received_command* command
380				= static_cast<const consumer_buffer_received_command*>(data);
381
382			BBuffer* buffer = fBufferCache->GetBuffer(command->buffer,
383				command->header.source_port);
384			if (buffer == NULL) {
385				ERROR("BBufferConsumer::CONSUMER_BUFFER_RECEIVED can't"
386					"find the buffer\n");
387			} else {
388				buffer->SetHeader(&command->header);
389
390				PRINT(4, "calling BBufferConsumer::BufferReceived buffer %ld "
391					"at perf %lld and TimeSource()->Now() is %lld\n",
392					buffer->Header()->buffer, buffer->Header()->start_time,
393					TimeSource()->Now());
394
395				BufferReceived(buffer);
396			}
397			return B_OK;
398		}
399
400		case CONSUMER_PRODUCER_DATA_STATUS:
401		{
402			const consumer_producer_data_status_command *command = static_cast<const consumer_producer_data_status_command *>(data);
403			ProducerDataStatus(command->for_whom, command->status, command->at_performance_time);
404			return B_OK;
405		}
406
407		case CONSUMER_GET_LATENCY_FOR:
408		{
409			const consumer_get_latency_for_request *request = static_cast<const consumer_get_latency_for_request *>(data);
410			consumer_get_latency_for_reply reply;
411			rv = GetLatencyFor(request->for_whom, &reply.latency, &reply.timesource);
412			request->SendReply(rv, &reply, sizeof(reply));
413			return B_OK;
414		}
415
416		case CONSUMER_CONNECTED:
417		{
418			const consumer_connected_request *request = static_cast<const consumer_connected_request *>(data);
419			consumer_connected_reply reply;
420			reply.input = request->input;
421			rv = Connected(request->input.source, request->input.destination, request->input.format, &reply.input);
422			request->SendReply(rv, &reply, sizeof(reply));
423			return B_OK;
424		}
425
426		case CONSUMER_DISCONNECTED:
427		{
428			const consumer_disconnected_request *request = static_cast<const consumer_disconnected_request *>(data);
429			// We no longer need to cache the buffers requested by the other end
430			// of this port.
431			fBufferCache->FlushCacheForPort(request->source.port);
432
433			consumer_disconnected_reply reply;
434			Disconnected(request->source, request->destination);
435			request->SendReply(B_OK, &reply, sizeof(reply));
436			return B_OK;
437		}
438
439		case CONSUMER_FORMAT_CHANGED:
440		{
441			const consumer_format_changed_request *request = static_cast<const consumer_format_changed_request *>(data);
442			consumer_format_changed_reply reply;
443			rv = FormatChanged(request->producer, request->consumer, request->change_tag, request->format);
444			request->SendReply(rv, &reply, sizeof(reply));
445
446			// XXX is this RequestCompleted() correct?
447			node_request_completed_command completedcommand;
448			completedcommand.info.what = media_request_info::B_FORMAT_CHANGED;
449			completedcommand.info.change_tag = request->change_tag;
450			completedcommand.info.status = reply.result;
451			//completedcommand.info.cookie
452			completedcommand.info.user_data = 0;
453			completedcommand.info.source = request->producer;
454			completedcommand.info.destination = request->consumer;
455			completedcommand.info.format = request->format;
456			SendToPort(request->consumer.port, NODE_REQUEST_COMPLETED, &completedcommand, sizeof(completedcommand));
457			return B_OK;
458		}
459
460		case CONSUMER_SEEK_TAG_REQUESTED:
461		{
462			const consumer_seek_tag_requested_request *request = static_cast<const consumer_seek_tag_requested_request *>(data);
463			consumer_seek_tag_requested_reply reply;
464			rv = SeekTagRequested(request->destination, request->target_time, request->flags, &reply.seek_tag, &reply.tagged_time, &reply.flags);
465			request->SendReply(rv, &reply, sizeof(reply));
466			return B_OK;
467		}
468	}
469	return B_ERROR;
470}
471
472status_t
473BBufferConsumer::SeekTagRequested(const media_destination &destination,
474								  bigtime_t in_target_time,
475								  uint32 in_flags,
476								  media_seek_tag *out_seek_tag,
477								  bigtime_t *out_tagged_time,
478								  uint32 *out_flags)
479{
480	CALLED();
481	// may be implemented by derived classes
482	return B_ERROR;
483}
484
485
486// #pragma mark - private BBufferConsumer
487
488
489/*
490not implemented:
491BBufferConsumer::BBufferConsumer()
492BBufferConsumer::BBufferConsumer(const BBufferConsumer &clone)
493BBufferConsumer & BBufferConsumer::operator=(const BBufferConsumer &clone)
494*/
495
496
497/*!	Deprecated function for BeOS R4.
498*/
499/* static */ status_t
500BBufferConsumer::SetVideoClippingFor(const media_source &output,
501									 const int16 *shorts,
502									 int32 short_count,
503									 const media_video_display_info &display,
504									 int32 *change_tag)
505{
506	CALLED();
507	if (IS_INVALID_SOURCE(output))
508		return B_MEDIA_BAD_SOURCE;
509	if (short_count > int(B_MEDIA_MESSAGE_SIZE - sizeof(producer_video_clipping_changed_command)) / 2)
510		debugger("BBufferConsumer::SetVideoClippingFor short_count too large (8000 limit)\n");
511
512	producer_video_clipping_changed_command *command;
513	size_t size;
514	status_t rv;
515
516	size = sizeof(producer_video_clipping_changed_command) + short_count * sizeof(short);
517	command = static_cast<producer_video_clipping_changed_command *>(malloc(size));
518	command->source = output;
519	command->destination = media_destination::null;
520	command->display = display;
521	command->user_data = 0;
522	command->change_tag = NewChangeTag();
523	command->short_count = short_count;
524	memcpy(command->shorts, shorts, short_count * sizeof(short));
525	if (change_tag != NULL)
526		*change_tag = command->change_tag;
527
528	rv = SendToPort(output.port, PRODUCER_VIDEO_CLIPPING_CHANGED, command, size);
529	free(command);
530	return rv;
531}
532
533
534/*!	Deprecated function for BeOS R4.
535*/
536/*static*/ status_t
537BBufferConsumer::RequestFormatChange(const media_source& source,
538	const media_destination& destination, media_format* format,
539	int32* _changeTag)
540{
541	CALLED();
542	if (IS_INVALID_SOURCE(source))
543		return B_MEDIA_BAD_SOURCE;
544	if (IS_INVALID_DESTINATION(destination))
545		return B_MEDIA_BAD_DESTINATION;
546
547	producer_format_change_requested_command command;
548
549	command.source = source;
550	command.destination = destination;
551	command.format = *format;
552	command.user_data = 0;
553	command.change_tag = NewChangeTag();
554	if (_changeTag != NULL)
555		*_changeTag = command.change_tag;
556
557	return SendToPort(source.port, PRODUCER_FORMAT_CHANGE_REQUESTED, &command,
558		sizeof(command));
559}
560
561
562/*!	Deprecated function for BeOS R4.
563*/
564/*static*/ status_t
565BBufferConsumer::SetOutputEnabled(const media_source& source, bool enabled,
566	int32* _changeTag)
567{
568	CALLED();
569	if (IS_INVALID_SOURCE(source))
570		return B_MEDIA_BAD_SOURCE;
571
572	producer_enable_output_command command;
573
574	command.source = source;
575	command.destination = media_destination::null;
576	command.enabled = enabled;
577	command.user_data = 0;
578	command.change_tag = NewChangeTag();
579	if (_changeTag != NULL)
580		*_changeTag = command.change_tag;
581
582	return SendToPort(source.port, PRODUCER_ENABLE_OUTPUT, &command,
583		sizeof(command));
584}
585
586
587status_t BBufferConsumer::_Reserved_BufferConsumer_0(void*) { return B_ERROR; }
588status_t BBufferConsumer::_Reserved_BufferConsumer_1(void*) { return B_ERROR; }
589status_t BBufferConsumer::_Reserved_BufferConsumer_2(void*) { return B_ERROR; }
590status_t BBufferConsumer::_Reserved_BufferConsumer_3(void*) { return B_ERROR; }
591status_t BBufferConsumer::_Reserved_BufferConsumer_4(void*) { return B_ERROR; }
592status_t BBufferConsumer::_Reserved_BufferConsumer_5(void*) { return B_ERROR; }
593status_t BBufferConsumer::_Reserved_BufferConsumer_6(void*) { return B_ERROR; }
594status_t BBufferConsumer::_Reserved_BufferConsumer_7(void*) { return B_ERROR; }
595status_t BBufferConsumer::_Reserved_BufferConsumer_8(void*) { return B_ERROR; }
596status_t BBufferConsumer::_Reserved_BufferConsumer_9(void*) { return B_ERROR; }
597status_t BBufferConsumer::_Reserved_BufferConsumer_10(void*) { return B_ERROR; }
598status_t BBufferConsumer::_Reserved_BufferConsumer_11(void*) { return B_ERROR; }
599status_t BBufferConsumer::_Reserved_BufferConsumer_12(void*) { return B_ERROR; }
600status_t BBufferConsumer::_Reserved_BufferConsumer_13(void*) { return B_ERROR; }
601status_t BBufferConsumer::_Reserved_BufferConsumer_14(void*) { return B_ERROR; }
602status_t BBufferConsumer::_Reserved_BufferConsumer_15(void*) { return B_ERROR; }
603
604