[libcamera-devel] [PATCH v1 22/23] gst: libcamerasrc: Implement initial streaming

Kieran Bingham kieran.bingham at ideasonboard.com
Wed Jan 29 13:17:50 CET 2020


Hi Nicolas,

On 29/01/2020 03:32, Nicolas Dufresne wrote:
> From: Nicolas Dufresne <nicolas.dufresne at collabora.com>
> 
> With this patch, the element is not able to push buffers to the next
> element in the graph. The buffers are currently missing any metadata
> like timestamp, sequence number. The handling of the GstFlowReturn
> for multiple pads isn't using a GstFlowCombiner as it should.

Aha, so perhaps this patch is not intended to be built/used yet and is
just for example purposes?


> Signed-off-by: Nicolas Dufresne <nicolas.dufresne at collabora.com>
> ---
>  src/gstreamer/gstlibcamerapad.cpp |  18 ++-
>  src/gstreamer/gstlibcamerapad.h   |   2 +
>  src/gstreamer/gstlibcamerasrc.cpp | 191 +++++++++++++++++++++++++++++-
>  3 files changed, 208 insertions(+), 3 deletions(-)
> 
> diff --git a/src/gstreamer/gstlibcamerapad.cpp b/src/gstreamer/gstlibcamerapad.cpp
> index 4a775e4..7bc44c1 100644
> --- a/src/gstreamer/gstlibcamerapad.cpp
> +++ b/src/gstreamer/gstlibcamerapad.cpp
> @@ -152,7 +152,7 @@ gst_libcamera_pad_push_pending(GstPad *pad)
>  {
>  	auto *self = GST_LIBCAMERA_PAD(pad);
>  	GstBuffer *buffer;
> -	GstFlowReturn ret = GST_FLOW_CUSTOM_SUCCESS;
> +	GstFlowReturn ret = GST_FLOW_OK;
>  
>  	{
>  		GST_OBJECT_LOCKER(self);
> @@ -164,3 +164,19 @@ gst_libcamera_pad_push_pending(GstPad *pad)
>  
>  	return ret;
>  }
> +
> +bool
> +gst_libcamera_pad_has_pending(GstPad *pad)
> +{
> +	auto *self = GST_LIBCAMERA_PAD(pad);
> +	GST_OBJECT_LOCKER(self);
> +	return (self->pending_buffers.length > 0);
> +}
> +
> +void
> +gst_libcamera_pad_set_latency(GstPad *pad, GstClockTime latency)
> +{
> +	auto *self = GST_LIBCAMERA_PAD(pad);
> +	GST_OBJECT_LOCKER(self);
> +	self->latency = latency;

self->latency isn't yet defined, and I don't think this function is used
- so probably needs to be moved to the timestamp patch you are working on.

> +}
> diff --git a/src/gstreamer/gstlibcamerapad.h b/src/gstreamer/gstlibcamerapad.h
> index d928570..eb24000 100644
> --- a/src/gstreamer/gstlibcamerapad.h
> +++ b/src/gstreamer/gstlibcamerapad.h
> @@ -30,4 +30,6 @@ void gst_libcamera_pad_queue_buffer(GstPad *pad, GstBuffer *buffer);
>  
>  GstFlowReturn gst_libcamera_pad_push_pending(GstPad *pad);
>  
> +bool gst_libcamera_pad_has_pending(GstPad *pad);
> +
>  #endif /* __GST_LIBCAMERA_PAD_H__ */
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index 5fc4393..947a8bf 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -12,8 +12,10 @@
>  #include "gstlibcamerapool.h"
>  #include "gstlibcamera-utils.h"
>  
> +#include <queue>
>  #include <libcamera/camera.h>
>  #include <libcamera/camera_manager.h>
> +#include <gst/base/base.h>
>  
>  using namespace libcamera;
>  
> @@ -22,12 +24,73 @@ GST_DEBUG_CATEGORY_STATIC(source_debug);
>  
>  #define STREAM_LOCKER(obj) g_autoptr(GRecMutexLocker) stream_locker = g_rec_mutex_locker_new(&GST_LIBCAMERA_SRC(obj)->stream_lock)
>  
> -/* Used for C++ object with destructors */
> +struct RequestWrap {
> +	RequestWrap(Request *request);
> +	~RequestWrap();
> +
> +	void AttachBuffer(GstBuffer *buffer);
> +	GstBuffer *DetachBuffer(Stream *stream);
> +
> +	/* For ptr comparision only */
> +	Request *request_;
> +	std::map<Stream *, GstBuffer *> buffers_;
> +};
> +
> +RequestWrap::RequestWrap(Request *request)
> +	: request_(request)
> +{
> +}
> +
> +RequestWrap::~RequestWrap()
> +{
> +	for (std::pair<Stream * const, GstBuffer *> &item : buffers_) {
> +		if (item.second)
> +			gst_buffer_unref(item.second);
> +	}
> +}
> +
> +void
> +RequestWrap::AttachBuffer(GstBuffer *buffer)
> +{
> +	FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
> +	Stream *stream = gst_libcamera_buffer_get_stream(buffer);
> +
> +	request_->addBuffer(stream, fb);
> +
> +	auto item = buffers_.find(stream);
> +	if (item != buffers_.end()) {
> +		gst_buffer_unref(item->second);
> +		item->second = buffer;
> +	} else {
> +		buffers_[stream] = buffer;
> +	}
> +}
> +
> +GstBuffer *
> +RequestWrap::DetachBuffer(Stream *stream)
> +{
> +	GstBuffer *buffer = nullptr;
> +
> +	auto item = buffers_.find(stream);
> +	if (item != buffers_.end()) {
> +		buffer = item->second;
> +		item->second = nullptr;
> +	}
> +
> +	return buffer;
> +}
> +
> +/* Used for C++ object with destructors and callbacks */
>  struct GstLibcameraSrcState {
> +	GstLibcameraSrc *src;
> +
>  	std::shared_ptr<CameraManager> cm;
>  	std::shared_ptr<Camera> cam;
>  	std::unique_ptr<CameraConfiguration> config;
>  	std::vector<GstPad *> srcpads;
> +	std::queue<std::unique_ptr<RequestWrap>> requests;
> +
> +	void requestCompleted(Request *request);
>  };
>  
>  struct _GstLibcameraSrc {
> @@ -40,6 +103,7 @@ struct _GstLibcameraSrc {
>  
>  	GstLibcameraSrcState *state;
>  	GstLibcameraAllocator *allocator;
> +	GstFlowCombiner *flow_combiner;
>  };
>  
>  enum {
> @@ -63,6 +127,41 @@ GstStaticPadTemplate request_src_template = {
>  	"src_%s", GST_PAD_SRC, GST_PAD_REQUEST, TEMPLATE_CAPS
>  };
>  
> +void
> +GstLibcameraSrcState::requestCompleted(Request *request)
> +{
> +	GST_OBJECT_LOCKER(this->src);
> +
> +	GST_DEBUG_OBJECT(this->src, "buffers are ready");
> +
> +	std::unique_ptr<RequestWrap> wrap = std::move(this->requests.front());
> +	this->requests.pop();
> +
> +	g_return_if_fail(wrap->request_ == request);
> +
> +	if ((request->status() == Request::RequestCancelled)) {
> +		GST_DEBUG_OBJECT(this->src, "Request was cancelled");
> +		return;
> +	}
> +
> +	GstBuffer *buffer;
> +	for (GstPad *srcpad : this->srcpads) {
> +		Stream *stream = gst_libcamera_pad_get_stream(srcpad);
> +		buffer = wrap->DetachBuffer(stream);
> +		gst_libcamera_pad_queue_buffer(srcpad, buffer);
> +	}
> +
> +	{
> +		/* We only want to resume the task if it's paused */
> +		GstTask *task = this->src->task;
> +		GST_OBJECT_LOCKER(task);
> +		if (GST_TASK_STATE(task) == GST_TASK_PAUSED) {
> +			GST_TASK_STATE(task) = GST_TASK_STARTED;
> +			GST_TASK_SIGNAL(task);
> +		}
> +	}
> +}
> +
>  static bool
>  gst_libcamera_src_open(GstLibcameraSrc *self)
>  {
> @@ -115,6 +214,8 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
>  		return false;
>  	}
>  
> +	cam->requestCompleted.connect(self->state, &GstLibcameraSrcState::requestCompleted);
> +
>  	/* no need to lock here we didn't start our threads */
>  	self->state->cm = cm;
>  	self->state->cam = cam;
> @@ -126,8 +227,74 @@ static void
>  gst_libcamera_src_task_run(gpointer user_data)
>  {
>  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> +	GstLibcameraSrcState *state = self->state;
> +
> +	Request *request = new Request(state->cam.get());
> +	auto wrap = std::make_unique<RequestWrap>(request);
> +	for (GstPad *srcpad : state->srcpads) {
> +		GstLibcameraPool *pool = gst_libcamera_pad_get_pool(srcpad);
> +		GstBuffer *buffer;
> +		GstFlowReturn ret;
> +
> +		ret = gst_buffer_pool_acquire_buffer(GST_BUFFER_POOL(pool),
> +						     &buffer, nullptr);
> +		if (ret != GST_FLOW_OK) {
> +			/* RequestWrap does not take ownership, and we won't be
> +			 * queueing this one due to lack of buffers */
> +			delete request;
> +			request = NULL;
> +			break;
> +		}
> +
> +		wrap->AttachBuffer(buffer);
> +	}
> +
> +	if (request) {
> +		GST_OBJECT_LOCKER(self);
> +		GST_TRACE_OBJECT(self, "Requesting buffers");
> +		state->cam->queueRequest(request);
> +		state->requests.push(std::move(wrap));
> +	}
> +
> +	GstFlowReturn ret = GST_FLOW_OK;
> +	gst_flow_combiner_reset(self->flow_combiner);
> +	for (GstPad *srcpad : state->srcpads) {
> +		ret = gst_libcamera_pad_push_pending(srcpad);
> +		ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
> +							srcpad, ret);
> +	}
> +
> +	{
> +		/* Here we need to decide if we want to pause or stop the task. This
> +		 * needs to happend in lock step with the callback thread which may want
> +		 * to resume the task.
> +		 */
> +		GST_OBJECT_LOCKER(self);
> +		if (ret != GST_FLOW_OK) {
> +			if (ret == GST_FLOW_EOS) {
> +				g_autoptr(GstEvent) eos = gst_event_new_eos();
> +				guint32 seqnum = gst_util_seqnum_next();
> +				gst_event_set_seqnum(eos, seqnum);
> +				for (GstPad *srcpad : state->srcpads)
> +					gst_pad_push_event(srcpad, gst_event_ref(eos));
> +			} else if (ret != GST_FLOW_FLUSHING) {
> +				GST_ELEMENT_FLOW_ERROR(self, ret);
> +			}
> +			gst_task_stop(self->task);
> +			return;
> +		}
>  
> -	GST_DEBUG_OBJECT(self, "Streaming thread it now capturing");
> +		gboolean do_pause = true;
> +		for (GstPad *srcpad : state->srcpads) {
> +			if (gst_libcamera_pad_has_pending(srcpad)) {
> +				do_pause = false;
> +				break;
> +			}
> +		}
> +
> +		if (do_pause)
> +			gst_task_pause(self->task);
> +	}
>  }
>  
>  static void
> @@ -137,6 +304,7 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
>  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
>  	GstLibcameraSrcState *state = self->state;
>  	GstFlowReturn flow_ret = GST_FLOW_OK;
> +	gint ret = 0;
>  
>  	GST_DEBUG_OBJECT(self, "Streaming thread has started");
>  
> @@ -219,12 +387,23 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
>  		return;
>  	}
>  
> +	self->flow_combiner = gst_flow_combiner_new();
>  	for (gsize i = 0; i < state->srcpads.size(); i++) {
>  		GstPad *srcpad = state->srcpads[i];
>  		const StreamConfiguration &stream_cfg = state->config->at(i);
>  		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
>  								stream_cfg.stream());
>  		gst_libcamera_pad_set_pool(srcpad, pool);
> +		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
> +	}
> +
> +	ret = state->cam->start();
> +	if (ret) {
> +		GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
> +				  ("Failed to start the camera: %s", g_strerror(-ret)),
> +				  ("Camera.start() failed with error code %i", ret));
> +		gst_task_stop(task);
> +		return;
>  	}
>  
>  done:
> @@ -254,6 +433,8 @@ gst_libcamera_src_task_leave(GstTask *task, GThread *thread, gpointer user_data)
>  		gst_libcamera_pad_set_pool(srcpad, NULL);
>  
>  	g_clear_object(&self->allocator);
> +	g_clear_pointer(&self->flow_combiner,
> +			(GDestroyNotify)gst_flow_combiner_free);
>  }
>  
>  static void
> @@ -333,6 +514,9 @@ gst_libcamera_src_change_state(GstElement *element, GstStateChange transition)
>  			return GST_STATE_CHANGE_FAILURE;
>  		ret = GST_STATE_CHANGE_NO_PREROLL;
>  		break;
> +	case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
> +		gst_task_start(self->task);
> +		break;
>  	case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
>  		ret = GST_STATE_CHANGE_NO_PREROLL;
>  		break;
> @@ -378,6 +562,9 @@ gst_libcamera_src_init(GstLibcameraSrc *self)
>  
>  	state->srcpads.push_back(gst_pad_new_from_template(templ, "src"));
>  	gst_element_add_pad(GST_ELEMENT(self), state->srcpads[0]);
> +
> +	/* C-style friend */
> +	state->src = self;
>  	self->state = state;
>  }
>  
> 

-- 
Regards
--
Kieran


More information about the libcamera-devel mailing list