[PATCH v1] gstreamer: pool: Replace GstAtomicQueue with deque and mutex

Kieran Bingham kieran.bingham at ideasonboard.com
Tue Jul 23 17:59:12 CEST 2024


Quoting Nicolas Dufresne (2024-06-05 20:41:20)
> From: Nicolas Dufresne <nicolas.dufresne at collabora.com>
> 
> The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We
> pop and push on error cases and we may have multiple threads downstream
> returning buffer (using tee), which breaks this assumption.
> 
> On top of which, the release function, that notify when the queue goes from

s/notify/notifies/

> empty to not-empty rely on a racy empty check. The downstream thread that

s/rely on a/relies on an/

> does this check if effectively concurrent with our thread calling acquire().

s/if/is/


> Fix this by replacing the GstAtomicQueue with a std::deque, and protect access
> to that using the object lock.
> 
> Bug: https://bugs.libcamera.org/show_bug.cgi?id=201
> Signed-off-by: Nicolas Dufresne <nicolas.dufresne at collabora.com>

This all looks reasonable to me - and following the bugzilla I see the
comment : https://bugs.libcamera.org/show_bug.cgi?id=201#c10 which says
it improved things, but that the issue might still persist (which could
be a separate race/issue still?)


Nicolas, what are your thoughts - do you think this patch is ready for
merge already?

For the code itself:


Reviewed-by: Kieran Bingham <kieran.bingham at ideasonboard.com>

> ---
>  src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++-------
>  1 file changed, 31 insertions(+), 9 deletions(-)
> 
> diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp
> index 9661c67a..0b1a5689 100644
> --- a/src/gstreamer/gstlibcamerapool.cpp
> +++ b/src/gstreamer/gstlibcamerapool.cpp
> @@ -8,6 +8,7 @@
>  
>  #include "gstlibcamerapool.h"
>  
> +#include <deque>
>  #include <libcamera/stream.h>
>  
>  #include "gstlibcamera-utils.h"
> @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS];
>  struct _GstLibcameraPool {
>         GstBufferPool parent;
>  
> -       GstAtomicQueue *queue;
> +       std::deque<GstBuffer *> *queue;
>         GstLibcameraAllocator *allocator;
>         Stream *stream;
>  };
>  
>  G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL)
>  
> +static GstBuffer *
> +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self)
> +{
> +       GLibLocker lock(GST_OBJECT(self));
> +       GstBuffer *buf;
> +
> +       if (self->queue->empty())
> +               return nullptr;
> +
> +       buf = self->queue->front();
> +       self->queue->pop_front();
> +
> +       return buf;
> +}
> +
>  static GstFlowReturn
>  gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer,
>                                   [[maybe_unused]] GstBufferPoolAcquireParams *params)
>  {
>         GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> -       GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue));
> +       GstBuffer *buf = gst_libcamera_pool_pop_buffer(self);
> +
>         if (!buf)
>                 return GST_FLOW_ERROR;
>  
>         if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) {
> -               gst_atomic_queue_push(self->queue, buf);
> +               GLibLocker lock(GST_OBJECT(self));
> +               self->queue->push_back(buf);
>                 return GST_FLOW_ERROR;
>         }
>  
> @@ -64,9 +82,13 @@ static void
>  gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
>  {
>         GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> -       bool do_notify = gst_atomic_queue_length(self->queue) == 0;
> +       bool do_notify;
>  
> -       gst_atomic_queue_push(self->queue, buffer);
> +       {
> +               GLibLocker lock(GST_OBJECT(self));
> +               do_notify = self->queue->empty();
> +               self->queue->push_back(buffer);
> +       }
>  
>         if (do_notify)
>                 g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0);
> @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
>  static void
>  gst_libcamera_pool_init(GstLibcameraPool *self)
>  {
> -       self->queue = gst_atomic_queue_new(4);
> +       self->queue = new std::deque<GstBuffer *>();
>  }
>  
>  static void
> @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object)
>         GstLibcameraPool *self = GST_LIBCAMERA_POOL(object);
>         GstBuffer *buf;
>  
> -       while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue))))
> +       while ((buf = gst_libcamera_pool_pop_buffer(self)))
>                 gst_buffer_unref(buf);
>  
> -       gst_atomic_queue_unref(self->queue);
> +       delete self->queue;
>         g_object_unref(self->allocator);
>  
>         G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object);
> @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream)
>         gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream);
>         for (gsize i = 0; i < pool_size; i++) {
>                 GstBuffer *buffer = gst_buffer_new();
> -               gst_atomic_queue_push(pool->queue, buffer);
> +               pool->queue->push_back(buffer);
>         }
>  
>         return pool;
> -- 
> 2.45.1
>


More information about the libcamera-devel mailing list