[libcamera-devel] [PATCH v2 22/27] gst: libcamerasrc: Implement initial streaming

Nicolas Dufresne nicolas.dufresne at collabora.com
Sat Feb 29 16:19:02 CET 2020


Le samedi 29 février 2020 à 16:54 +0200, Laurent Pinchart a écrit :
> Hi Nicolas,
> 
> Thank you for the patch.
> 
> On Thu, Feb 27, 2020 at 03:04:02PM -0500, Nicolas Dufresne wrote:
> > With this patch, the element is now able to push buffers to the next
> > element in the graph. The buffers are currently missing any metadata
> > like timestamp, sequence number. This will be added in the next commit.
> > 
> > Signed-off-by: Nicolas Dufresne <nicolas.dufresne at collabora.com>
> > ---
> >  src/gstreamer/gstlibcamerapad.cpp |  11 +-
> >  src/gstreamer/gstlibcamerapad.h   |   2 +
> >  src/gstreamer/gstlibcamerasrc.cpp | 192 +++++++++++++++++++++++++++++-
> >  3 files changed, 202 insertions(+), 3 deletions(-)
> > 
> > diff --git a/src/gstreamer/gstlibcamerapad.cpp b/src/gstreamer/gstlibcamerapad.cpp
> > index 8662c92..2cf1630 100644
> > --- a/src/gstreamer/gstlibcamerapad.cpp
> > +++ b/src/gstreamer/gstlibcamerapad.cpp
> > @@ -18,6 +18,7 @@ struct _GstLibcameraPad {
> >  	StreamRole role;
> >  	GstLibcameraPool *pool;
> >  	GQueue pending_buffers;
> > +	GstClockTime latency;
> >  };
> >  
> >  enum {
> > @@ -159,7 +160,15 @@ gst_libcamera_pad_push_pending(GstPad *pad)
> >  	}
> >  
> >  	if (!buffer)
> > -		return GST_FLOW_CUSTOM_SUCCESS;
> > +		return GST_FLOW_OK;
> 
> Does this belong to the previous patch ?

I forgot why it was custom success initially, it's needed to make
streaming work, I should probably try and squash this into previous
commits indeed.

> 
> >  
> >  	return gst_pad_push(pad, buffer);
> >  }
> > +
> > +bool
> > +gst_libcamera_pad_has_pending(GstPad *pad)
> > +{
> > +	auto *self = GST_LIBCAMERA_PAD(pad);
> > +	GLibLocker lock(GST_OBJECT(self));
> > +	return (self->pending_buffers.length > 0);
> 
> No need for the outer parentheses.
> 
> > +}
> > diff --git a/src/gstreamer/gstlibcamerapad.h b/src/gstreamer/gstlibcamerapad.h
> > index d928570..eb24000 100644
> > --- a/src/gstreamer/gstlibcamerapad.h
> > +++ b/src/gstreamer/gstlibcamerapad.h
> > @@ -30,4 +30,6 @@ void gst_libcamera_pad_queue_buffer(GstPad *pad, GstBuffer *buffer);
> >  
> >  GstFlowReturn gst_libcamera_pad_push_pending(GstPad *pad);
> >  
> > +bool gst_libcamera_pad_has_pending(GstPad *pad);
> > +
> >  #endif /* __GST_LIBCAMERA_PAD_H__ */
> > diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> > index 83f93cc..70f2048 100644
> > --- a/src/gstreamer/gstlibcamerasrc.cpp
> > +++ b/src/gstreamer/gstlibcamerasrc.cpp
> > @@ -18,8 +18,10 @@
> >  #include "gstlibcamerapool.h"
> >  #include "gstlibcamerasrc.h"
> >  
> > +#include <gst/base/base.h>
> >  #include <libcamera/camera.h>
> >  #include <libcamera/camera_manager.h>
> > +#include <queue>
> >  #include <vector>
> >  
> >  using namespace libcamera;
> > @@ -27,12 +29,71 @@ using namespace libcamera;
> >  GST_DEBUG_CATEGORY_STATIC(source_debug);
> >  #define GST_CAT_DEFAULT source_debug
> >  
> > +struct RequestWrap {
> > +	RequestWrap(Request *request);
> > +	~RequestWrap();
> > +
> > +	void attachBuffer(GstBuffer *buffer);
> > +	GstBuffer *detachBuffer(Stream *stream);
> > +
> > +	/* For ptr comparision only. */
> 
> s/comparision/comparison/
> 
> > +	Request *request_;
> > +	std::map<Stream *, GstBuffer *> buffers_;
> > +};
> > +
> > +RequestWrap::RequestWrap(Request *request)
> > +	: request_(request)
> > +{
> > +}
> > +
> > +RequestWrap::~RequestWrap()
> > +{
> > +	for (std::pair<Stream *const, GstBuffer *> &item : buffers_) {
> > +		if (item.second)
> > +			gst_buffer_unref(item.second);
> > +	}
> > +}
> > +
> > +void RequestWrap::attachBuffer(GstBuffer *buffer)
> > +{
> > +	FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
> > +	Stream *stream = gst_libcamera_buffer_get_stream(buffer);
> > +
> > +	request_->addBuffer(stream, fb);
> > +
> > +	auto item = buffers_.find(stream);
> > +	if (item != buffers_.end()) {
> > +		gst_buffer_unref(item->second);
> > +		item->second = buffer;
> > +	} else {
> > +		buffers_[stream] = buffer;
> > +	}
> > +}
> > +
> > +GstBuffer *RequestWrap::detachBuffer(Stream *stream)
> > +{
> > +	GstBuffer *buffer = nullptr;
> > +
> > +	auto item = buffers_.find(stream);
> > +	if (item != buffers_.end()) {
> > +		buffer = item->second;
> > +		item->second = nullptr;
> 
> Do you really want to set second to nullptr here, or should you
> 
> 		buffers_.erase(item);
> 
> ?

I'm just not familiar enough with the data structure to really answer,
what does erase do ? This basically map a stream and a buffer, if the
buffer is out for delivery, isn't it logical to map it back to no
buffer rather then removing the mapping? Will erasing the mapping will
cause an free/alloc ?
> 
> > +	}
> > +
> > +	return buffer;
> > +}
> > +
> >  /* Used for C++ object with destructors. */
> >  struct GstLibcameraSrcState {
> > +	GstLibcameraSrc *src;
> > +
> >  	std::unique_ptr<CameraManager> cm;
> >  	std::shared_ptr<Camera> cam;
> >  	std::unique_ptr<CameraConfiguration> config;
> >  	std::vector<GstPad *> srcpads;
> > +	std::queue<std::unique_ptr<RequestWrap>> requests;
> > +
> > +	void requestCompleted(Request *request);
> >  };
> >  
> >  struct _GstLibcameraSrc {
> > @@ -45,6 +106,7 @@ struct _GstLibcameraSrc {
> >  
> >  	GstLibcameraSrcState *state;
> >  	GstLibcameraAllocator *allocator;
> > +	GstFlowCombiner *flow_combiner;
> >  };
> >  
> >  enum {
> > @@ -68,6 +130,41 @@ GstStaticPadTemplate request_src_template = {
> >  	"src_%s", GST_PAD_SRC, GST_PAD_REQUEST, TEMPLATE_CAPS
> >  };
> >  
> > +void
> > +GstLibcameraSrcState::requestCompleted(Request *request)
> > +{
> > +	GLibLocker lock(GST_OBJECT(this->src));
> 
> This is a member function, so you can drop the "this->" and write "src".
> Same for everything below. You may want to suffix the data members with
> _ to avoid confusion though.

The _ style is probably what I dislike the most of libcamera coding
style, so I was trying to get away with making Request a struct and
using c-style :-), but here I thought it was not obvious if I didn't
use this->. I can adhere to the style, I'll need to do that in all
internal C++ structs.

> 
> Do we need to lock the whole function ? The requestComplete slot is
> called from the pipeline handler thread, so we should minimize the time
> we spent here, especially with locks held. Are there any expensive
> operation below, is there a risk of delay due to lock contention ?
> 
> > +
> > +	GST_DEBUG_OBJECT(this->src, "buffers are ready");
> > +
> > +	std::unique_ptr<RequestWrap> wrap = std::move(this->requests.front());
> > +	this->requests.pop();

Must be locked, fast.

> > +
> > +	g_return_if_fail(wrap->request_ == request);
> > +
> > +	if ((request->status() == Request::RequestCancelled)) {
> > +		GST_DEBUG_OBJECT(this->src, "Request was cancelled");
> > +		return;
> > +	}
> > +
> > +	GstBuffer *buffer;
> > +	for (GstPad *srcpad : this->srcpads) {
> > +		Stream *stream = gst_libcamera_pad_get_stream(srcpad);
> > +		buffer = wrap->detachBuffer(stream);
> > +		gst_libcamera_pad_queue_buffer(srcpad, buffer);
> > +	}

Must be locked, also fast.

> > +
> > +	{
> > +		/* We only want to resume the task if it's paused. */
> > +		GstTask *task = this->src->task;
> > +		GLibLocker lock(GST_OBJECT(task));
> > +		if (GST_TASK_STATE(task) == GST_TASK_PAUSED) {
> > +			GST_TASK_STATE(task) = GST_TASK_STARTED;
> > +			GST_TASK_SIGNAL(task);
> > +		}
> > +	}

This also really fast. So I think it's fine. Object locks should only
be taken in GStreamer for short operation (in fact, if you try to do
something fancy with these held, you will have a deadlock). So I don't
think holding over the function is a concern. The entire thing here has
been designed so this callback only queues the ready buffers in various
places and wake up the pusher thread if needed. The pusher thread is
that one dealing with stream locks, which is by design contentious for
push back purpose.

> > +}
> > +
> >  static bool
> >  gst_libcamera_src_open(GstLibcameraSrc *self)
> >  {
> > @@ -120,6 +217,8 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
> >  		return false;
> >  	}
> >  
> > +	cam->requestCompleted.connect(self->state, &GstLibcameraSrcState::requestCompleted);
> > +
> >  	/* No need to lock here, we didn't start our threads yet. */
> >  	self->state->cm = std::move(cm);
> >  	self->state->cam = cam;
> > @@ -131,17 +230,85 @@ static void
> >  gst_libcamera_src_task_run(gpointer user_data)
> >  {
> >  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> > +	GstLibcameraSrcState *state = self->state;
> > +
> > +	Request *request = new Request(state->cam.get());
> 
> You should use cam->createRequest(). Looks like we forgot to make the
> Request constructor private.
> 
> > +	auto wrap = std::make_unique<RequestWrap>(request);
> > +	for (GstPad *srcpad : state->srcpads) {
> > +		GstLibcameraPool *pool = gst_libcamera_pad_get_pool(srcpad);
> > +		GstBuffer *buffer;
> > +		GstFlowReturn ret;
> > +
> > +		ret = gst_buffer_pool_acquire_buffer(GST_BUFFER_POOL(pool),
> > +						     &buffer, nullptr);
> > +		if (ret != GST_FLOW_OK) {
> > +			/* RequestWrap does not take ownership, and we won't be
> > +			 * queueing this one due to lack of buffers. */
> 
> 			/*
> 			 * ...
> 			 */
> 
> and we should fix this in libcamera, the API makes request ownership
> awkward (not that the implementation is broken, it's just the API).

Ok, will add a \todo while at it.

> 
> > +			delete request;
> > +			request = NULL;
> 
> s/NULL/nullptr/

Damn, I was sure I managed to catch them all.

> 
> > +			break;
> > +		}
> > +
> > +		wrap->attachBuffer(buffer);
> > +	}
> > +
> > +	if (request) {
> > +		GLibLocker lock(GST_OBJECT(self));
> > +		GST_TRACE_OBJECT(self, "Requesting buffers");
> > +		state->cam->queueRequest(request);
> > +		state->requests.push(std::move(wrap));
> > +	}
> > +
> > +	GstFlowReturn ret = GST_FLOW_OK;
> > +	gst_flow_combiner_reset(self->flow_combiner);
> > +	for (GstPad *srcpad : state->srcpads) {
> > +		ret = gst_libcamera_pad_push_pending(srcpad);
> > +		ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
> > +							srcpad, ret);
> 
> We have a single pad so that's no an issue yet, but this looks ignores
> errors from any pad but the last.

I've guessed you mean "this loop". The flow combiner has a state, and
remember the last flow that was into it. See the last argument of
gst_flow_combiner_update_pad_flow(), but maybe you see something that I
don't.

> 
> > +	}
> >  
> > -	GST_DEBUG_OBJECT(self, "Streaming thread is now capturing");
> > +	{
> > +		/*
> > +		 * Here we need to decide if we want to pause or stop the task. This
> > +		 * needs to happen in lock step with the callback thread which may want
> > +		 * to resume the task.
> > +		 */
> > +		GLibLocker lock(GST_OBJECT(self));
> > +		if (ret != GST_FLOW_OK) {
> > +			if (ret == GST_FLOW_EOS) {
> > +				g_autoptr(GstEvent) eos = gst_event_new_eos();
> > +				guint32 seqnum = gst_util_seqnum_next();
> > +				gst_event_set_seqnum(eos, seqnum);
> > +				for (GstPad *srcpad : state->srcpads)
> > +					gst_pad_push_event(srcpad, gst_event_ref(eos));
> > +			} else if (ret != GST_FLOW_FLUSHING) {
> > +				GST_ELEMENT_FLOW_ERROR(self, ret);
> > +			}
> > +			gst_task_stop(self->task);
> > +			return;
> > +		}
> > +
> > +		gboolean do_pause = true;
> 
> I would replace this with a bool, or use TRUE here (and FALSE below).

I'd use a bool indeed. Whenever I use TRUE/FALSE is when it's an
external API using gboolean, cause gboolean is not the same size as
bool.

> 
> > +		for (GstPad *srcpad : state->srcpads) {
> > +			if (gst_libcamera_pad_has_pending(srcpad)) {
> > +				do_pause = false;
> > +				break;
> > +			}
> > +		}
> > +
> > +		if (do_pause)
> > +			gst_task_pause(self->task);
> > +	}
> >  }
> >  
> >  static void
> >  gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
> >  {
> >  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> > -	GLibRecLocker(&self->stream_lock);
> > +	GLibRecLocker lock(&self->stream_lock);
> 
> This this fix a compilation error in a previous patch ?

This one was badly squashed, I'll fix. Was a typo I made, which can
have disastrous effect, but surprisingly GCC didn't even warn about
this (calling a constructor, but not storing the object).

> 
> >  	GstLibcameraSrcState *state = self->state;
> >  	GstFlowReturn flow_ret = GST_FLOW_OK;
> > +	gint ret = 0;
> 
> I don't think you need to initialize this to 0.
> 
> >  
> >  	GST_DEBUG_OBJECT(self, "Streaming thread has started");
> >  
> > @@ -230,12 +397,23 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
> >  		return;
> >  	}
> >  
> > +	self->flow_combiner = gst_flow_combiner_new();
> 
> If an error occurs above and flow_combiner isn't set there, won't the
> g_clear_pointer() call below operate on an unitialized variable ?

g_clear_pointer() is made for GObject dispose() function, which can be
called multiple time if there is circular references. Short story, it's
nul-safe. As for GObject instance, they are always initialized with 0,
this is part of the API.

> 
> >  	for (gsize i = 0; i < state->srcpads.size(); i++) {
> >  		GstPad *srcpad = state->srcpads[i];
> >  		const StreamConfiguration &stream_cfg = state->config->at(i);
> >  		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
> >  								stream_cfg.stream());
> >  		gst_libcamera_pad_set_pool(srcpad, pool);
> > +		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
> > +	}
> > +
> > +	ret = state->cam->start();
> > +	if (ret) {
> > +		GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
> > +				  ("Failed to start the camera: %s", g_strerror(-ret)),
> > +				  ("Camera.start() failed with error code %i", ret));
> > +		gst_task_stop(task);
> > +		return;
> >  	}
> >  
> >  done:
> > @@ -257,10 +435,14 @@ gst_libcamera_src_task_leave(GstTask *task, GThread *thread, gpointer user_data)
> >  
> >  	GST_DEBUG_OBJECT(self, "Streaming thread is about to stop");
> >  
> > +	state->cam->stop();
> > +
> >  	for (GstPad *srcpad : state->srcpads)
> >  		gst_libcamera_pad_set_pool(srcpad, NULL);
> 
> s/NULL/nullptr/
> 
> >  
> >  	g_clear_object(&self->allocator);
> > +	g_clear_pointer(&self->flow_combiner,
> > +			(GDestroyNotify)gst_flow_combiner_free);
> >  }
> >  
> >  static void
> > @@ -340,6 +522,9 @@ gst_libcamera_src_change_state(GstElement *element, GstStateChange transition)
> >  			return GST_STATE_CHANGE_FAILURE;
> >  		ret = GST_STATE_CHANGE_NO_PREROLL;
> >  		break;
> > +	case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
> > +		gst_task_start(self->task);
> > +		break;
> >  	case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
> >  		ret = GST_STATE_CHANGE_NO_PREROLL;
> >  		break;
> > @@ -389,6 +574,9 @@ gst_libcamera_src_init(GstLibcameraSrc *self)
> >  
> >  	state->srcpads.push_back(gst_pad_new_from_template(templ, "src"));
> >  	gst_element_add_pad(GST_ELEMENT(self), state->srcpads[0]);
> > +
> > +	/* C-style friend. */
> > +	state->src = self;
> >  	self->state = state;
> >  }
> >  
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 195 bytes
Desc: This is a digitally signed message part
URL: <https://lists.libcamera.org/pipermail/libcamera-devel/attachments/20200229/ac256a30/attachment-0001.sig>


More information about the libcamera-devel mailing list