[PATCH v1] gstreamer: pool: Replace GstAtomicQueue with deque and mutex

Jacopo Mondi jacopo.mondi at ideasonboard.com
Thu Jul 25 10:55:30 CEST 2024


Hi Nicolas

On Wed, Jun 05, 2024 at 03:41:20PM GMT, Nicolas Dufresne wrote:
> From: Nicolas Dufresne <nicolas.dufresne at collabora.com>
>
> The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We
> pop and push on error cases and we may have multiple threads downstream
> returning buffer (using tee), which breaks this assumption.
>
> On top of which, the release function, that notify when the queue goes from
> empty to not-empty rely on a racy empty check. The downstream thread that
> does this check if effectively concurrent with our thread calling acquire().
>
> Fix this by replacing the GstAtomicQueue with a std::deque, and protect access
> to that using the object lock.
>
> Bug: https://bugs.libcamera.org/show_bug.cgi?id=201
> Signed-off-by: Nicolas Dufresne <nicolas.dufresne at collabora.com>

Looks sane to me even if the gst code base is not familiar to me
Acked-by: Jacopo Mondi <jacopo.mondi at ideasonboard.com>

Thanks
  j
> ---
>  src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++-------
>  1 file changed, 31 insertions(+), 9 deletions(-)
>
> diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp
> index 9661c67a..0b1a5689 100644
> --- a/src/gstreamer/gstlibcamerapool.cpp
> +++ b/src/gstreamer/gstlibcamerapool.cpp
> @@ -8,6 +8,7 @@
>
>  #include "gstlibcamerapool.h"
>
> +#include <deque>
>  #include <libcamera/stream.h>
>
>  #include "gstlibcamera-utils.h"
> @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS];
>  struct _GstLibcameraPool {
>  	GstBufferPool parent;
>
> -	GstAtomicQueue *queue;
> +	std::deque<GstBuffer *> *queue;
>  	GstLibcameraAllocator *allocator;
>  	Stream *stream;
>  };
>
>  G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL)
>
> +static GstBuffer *
> +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self)
> +{
> +	GLibLocker lock(GST_OBJECT(self));
> +	GstBuffer *buf;
> +
> +	if (self->queue->empty())
> +		return nullptr;
> +
> +	buf = self->queue->front();
> +	self->queue->pop_front();
> +
> +	return buf;
> +}
> +
>  static GstFlowReturn
>  gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer,
>  				  [[maybe_unused]] GstBufferPoolAcquireParams *params)
>  {
>  	GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> -	GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue));
> +	GstBuffer *buf = gst_libcamera_pool_pop_buffer(self);
> +
>  	if (!buf)
>  		return GST_FLOW_ERROR;
>
>  	if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) {
> -		gst_atomic_queue_push(self->queue, buf);
> +		GLibLocker lock(GST_OBJECT(self));
> +		self->queue->push_back(buf);
>  		return GST_FLOW_ERROR;
>  	}
>
> @@ -64,9 +82,13 @@ static void
>  gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
>  {
>  	GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> -	bool do_notify = gst_atomic_queue_length(self->queue) == 0;
> +	bool do_notify;
>
> -	gst_atomic_queue_push(self->queue, buffer);
> +	{
> +		GLibLocker lock(GST_OBJECT(self));
> +		do_notify = self->queue->empty();
> +		self->queue->push_back(buffer);
> +	}
>
>  	if (do_notify)
>  		g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0);
> @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
>  static void
>  gst_libcamera_pool_init(GstLibcameraPool *self)
>  {
> -	self->queue = gst_atomic_queue_new(4);
> +	self->queue = new std::deque<GstBuffer *>();
>  }
>
>  static void
> @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object)
>  	GstLibcameraPool *self = GST_LIBCAMERA_POOL(object);
>  	GstBuffer *buf;
>
> -	while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue))))
> +	while ((buf = gst_libcamera_pool_pop_buffer(self)))
>  		gst_buffer_unref(buf);
>
> -	gst_atomic_queue_unref(self->queue);
> +	delete self->queue;
>  	g_object_unref(self->allocator);
>
>  	G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object);
> @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream)
>  	gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream);
>  	for (gsize i = 0; i < pool_size; i++) {
>  		GstBuffer *buffer = gst_buffer_new();
> -		gst_atomic_queue_push(pool->queue, buffer);
> +		pool->queue->push_back(buffer);
>  	}
>
>  	return pool;
> --
> 2.45.1
>


More information about the libcamera-devel mailing list