[PATCH v1] gstreamer: pool: Replace GstAtomicQueue with deque and mutex
Nicolas Dufresne
nicolas.dufresne at collabora.com
Tue Jul 23 18:08:33 CEST 2024
Hi,
Le mardi 23 juillet 2024 à 16:59 +0100, Kieran Bingham a écrit :
> Quoting Nicolas Dufresne (2024-06-05 20:41:20)
> > From: Nicolas Dufresne <nicolas.dufresne at collabora.com>
> >
> > The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We
> > pop and push on error cases and we may have multiple threads downstream
> > returning buffer (using tee), which breaks this assumption.
> >
> > On top of which, the release function, that notify when the queue goes from
>
> s/notify/notifies/
>
> > empty to not-empty rely on a racy empty check. The downstream thread that
>
> s/rely on a/relies on an/
>
> > does this check if effectively concurrent with our thread calling acquire().
>
> s/if/is/
>
>
> > Fix this by replacing the GstAtomicQueue with a std::deque, and protect access
> > to that using the object lock.
> >
> > Bug: https://bugs.libcamera.org/show_bug.cgi?id=201
> > Signed-off-by: Nicolas Dufresne <nicolas.dufresne at collabora.com>
>
> This all looks reasonable to me - and following the bugzilla I see the
> comment : https://bugs.libcamera.org/show_bug.cgi?id=201#c10 which says
> it improved things, but that the issue might still persist (which could
> be a separate race/issue still?)
>
>
> Nicolas, what are your thoughts - do you think this patch is ready for
> merge already?
>
> For the code itself:
>
>
> Reviewed-by: Kieran Bingham <kieran.bingham at ideasonboard.com>
With the suggested typo, yes, I'd picked it up as it aligns the code with
similar fixes that happen upstream. I still have to find time to look into the
remaining, but shouldn't block this one, GstAtomicQueue the way we use it now
must go away anyway.
Nicolas
>
> > ---
> > src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++-------
> > 1 file changed, 31 insertions(+), 9 deletions(-)
> >
> > diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp
> > index 9661c67a..0b1a5689 100644
> > --- a/src/gstreamer/gstlibcamerapool.cpp
> > +++ b/src/gstreamer/gstlibcamerapool.cpp
> > @@ -8,6 +8,7 @@
> >
> > #include "gstlibcamerapool.h"
> >
> > +#include <deque>
> > #include <libcamera/stream.h>
> >
> > #include "gstlibcamera-utils.h"
> > @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS];
> > struct _GstLibcameraPool {
> > GstBufferPool parent;
> >
> > - GstAtomicQueue *queue;
> > + std::deque<GstBuffer *> *queue;
> > GstLibcameraAllocator *allocator;
> > Stream *stream;
> > };
> >
> > G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL)
> >
> > +static GstBuffer *
> > +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self)
> > +{
> > + GLibLocker lock(GST_OBJECT(self));
> > + GstBuffer *buf;
> > +
> > + if (self->queue->empty())
> > + return nullptr;
> > +
> > + buf = self->queue->front();
> > + self->queue->pop_front();
> > +
> > + return buf;
> > +}
> > +
> > static GstFlowReturn
> > gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer,
> > [[maybe_unused]] GstBufferPoolAcquireParams *params)
> > {
> > GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> > - GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue));
> > + GstBuffer *buf = gst_libcamera_pool_pop_buffer(self);
> > +
> > if (!buf)
> > return GST_FLOW_ERROR;
> >
> > if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) {
> > - gst_atomic_queue_push(self->queue, buf);
> > + GLibLocker lock(GST_OBJECT(self));
> > + self->queue->push_back(buf);
> > return GST_FLOW_ERROR;
> > }
> >
> > @@ -64,9 +82,13 @@ static void
> > gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
> > {
> > GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> > - bool do_notify = gst_atomic_queue_length(self->queue) == 0;
> > + bool do_notify;
> >
> > - gst_atomic_queue_push(self->queue, buffer);
> > + {
> > + GLibLocker lock(GST_OBJECT(self));
> > + do_notify = self->queue->empty();
> > + self->queue->push_back(buffer);
> > + }
> >
> > if (do_notify)
> > g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0);
> > @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
> > static void
> > gst_libcamera_pool_init(GstLibcameraPool *self)
> > {
> > - self->queue = gst_atomic_queue_new(4);
> > + self->queue = new std::deque<GstBuffer *>();
> > }
> >
> > static void
> > @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object)
> > GstLibcameraPool *self = GST_LIBCAMERA_POOL(object);
> > GstBuffer *buf;
> >
> > - while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue))))
> > + while ((buf = gst_libcamera_pool_pop_buffer(self)))
> > gst_buffer_unref(buf);
> >
> > - gst_atomic_queue_unref(self->queue);
> > + delete self->queue;
> > g_object_unref(self->allocator);
> >
> > G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object);
> > @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream)
> > gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream);
> > for (gsize i = 0; i < pool_size; i++) {
> > GstBuffer *buffer = gst_buffer_new();
> > - gst_atomic_queue_push(pool->queue, buffer);
> > + pool->queue->push_back(buffer);
> > }
> >
> > return pool;
> > --
> > 2.45.1
> >
More information about the libcamera-devel
mailing list