[PATCH v1] gstreamer: pool: Replace GstAtomicQueue with deque and mutex

Nicolas Dufresne nicolas.dufresne at collabora.com
Tue Jul 23 18:08:33 CEST 2024


Hi,

Le mardi 23 juillet 2024 à 16:59 +0100, Kieran Bingham a écrit :
> Quoting Nicolas Dufresne (2024-06-05 20:41:20)
> > From: Nicolas Dufresne <nicolas.dufresne at collabora.com>
> > 
> > The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We
> > pop and push on error cases and we may have multiple threads downstream
> > returning buffer (using tee), which breaks this assumption.
> > 
> > On top of which, the release function, that notify when the queue goes from
> 
> s/notify/notifies/
> 
> > empty to not-empty rely on a racy empty check. The downstream thread that
> 
> s/rely on a/relies on an/
> 
> > does this check if effectively concurrent with our thread calling acquire().
> 
> s/if/is/
> 
> 
> > Fix this by replacing the GstAtomicQueue with a std::deque, and protect access
> > to that using the object lock.
> > 
> > Bug: https://bugs.libcamera.org/show_bug.cgi?id=201
> > Signed-off-by: Nicolas Dufresne <nicolas.dufresne at collabora.com>
> 
> This all looks reasonable to me - and following the bugzilla I see the
> comment : https://bugs.libcamera.org/show_bug.cgi?id=201#c10 which says
> it improved things, but that the issue might still persist (which could
> be a separate race/issue still?)
> 
> 
> Nicolas, what are your thoughts - do you think this patch is ready for
> merge already?
> 
> For the code itself:
> 
> 
> Reviewed-by: Kieran Bingham <kieran.bingham at ideasonboard.com>

With the suggested typo, yes, I'd picked it up as it aligns the code with
similar fixes that happen upstream. I still have to find time to look into the
remaining, but shouldn't block this one, GstAtomicQueue the way we use it now
must go away anyway.

Nicolas

> 
> > ---
> >  src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++-------
> >  1 file changed, 31 insertions(+), 9 deletions(-)
> > 
> > diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp
> > index 9661c67a..0b1a5689 100644
> > --- a/src/gstreamer/gstlibcamerapool.cpp
> > +++ b/src/gstreamer/gstlibcamerapool.cpp
> > @@ -8,6 +8,7 @@
> >  
> >  #include "gstlibcamerapool.h"
> >  
> > +#include <deque>
> >  #include <libcamera/stream.h>
> >  
> >  #include "gstlibcamera-utils.h"
> > @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS];
> >  struct _GstLibcameraPool {
> >         GstBufferPool parent;
> >  
> > -       GstAtomicQueue *queue;
> > +       std::deque<GstBuffer *> *queue;
> >         GstLibcameraAllocator *allocator;
> >         Stream *stream;
> >  };
> >  
> >  G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL)
> >  
> > +static GstBuffer *
> > +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self)
> > +{
> > +       GLibLocker lock(GST_OBJECT(self));
> > +       GstBuffer *buf;
> > +
> > +       if (self->queue->empty())
> > +               return nullptr;
> > +
> > +       buf = self->queue->front();
> > +       self->queue->pop_front();
> > +
> > +       return buf;
> > +}
> > +
> >  static GstFlowReturn
> >  gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer,
> >                                   [[maybe_unused]] GstBufferPoolAcquireParams *params)
> >  {
> >         GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> > -       GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue));
> > +       GstBuffer *buf = gst_libcamera_pool_pop_buffer(self);
> > +
> >         if (!buf)
> >                 return GST_FLOW_ERROR;
> >  
> >         if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) {
> > -               gst_atomic_queue_push(self->queue, buf);
> > +               GLibLocker lock(GST_OBJECT(self));
> > +               self->queue->push_back(buf);
> >                 return GST_FLOW_ERROR;
> >         }
> >  
> > @@ -64,9 +82,13 @@ static void
> >  gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
> >  {
> >         GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> > -       bool do_notify = gst_atomic_queue_length(self->queue) == 0;
> > +       bool do_notify;
> >  
> > -       gst_atomic_queue_push(self->queue, buffer);
> > +       {
> > +               GLibLocker lock(GST_OBJECT(self));
> > +               do_notify = self->queue->empty();
> > +               self->queue->push_back(buffer);
> > +       }
> >  
> >         if (do_notify)
> >                 g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0);
> > @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
> >  static void
> >  gst_libcamera_pool_init(GstLibcameraPool *self)
> >  {
> > -       self->queue = gst_atomic_queue_new(4);
> > +       self->queue = new std::deque<GstBuffer *>();
> >  }
> >  
> >  static void
> > @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object)
> >         GstLibcameraPool *self = GST_LIBCAMERA_POOL(object);
> >         GstBuffer *buf;
> >  
> > -       while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue))))
> > +       while ((buf = gst_libcamera_pool_pop_buffer(self)))
> >                 gst_buffer_unref(buf);
> >  
> > -       gst_atomic_queue_unref(self->queue);
> > +       delete self->queue;
> >         g_object_unref(self->allocator);
> >  
> >         G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object);
> > @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream)
> >         gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream);
> >         for (gsize i = 0; i < pool_size; i++) {
> >                 GstBuffer *buffer = gst_buffer_new();
> > -               gst_atomic_queue_push(pool->queue, buffer);
> > +               pool->queue->push_back(buffer);
> >         }
> >  
> >         return pool;
> > -- 
> > 2.45.1
> > 



More information about the libcamera-devel mailing list