[PATCH v1] gstreamer: pool: Replace GstAtomicQueue with deque and mutex
Kieran Bingham
kieran.bingham at ideasonboard.com
Tue Jul 23 17:59:12 CEST 2024
Quoting Nicolas Dufresne (2024-06-05 20:41:20)
> From: Nicolas Dufresne <nicolas.dufresne at collabora.com>
>
> The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We
> pop and push on error cases and we may have multiple threads downstream
> returning buffer (using tee), which breaks this assumption.
>
> On top of which, the release function, that notify when the queue goes from
s/notify/notifies/
> empty to not-empty rely on a racy empty check. The downstream thread that
s/rely on a/relies on an/
> does this check if effectively concurrent with our thread calling acquire().
s/if/is/
> Fix this by replacing the GstAtomicQueue with a std::deque, and protect access
> to that using the object lock.
>
> Bug: https://bugs.libcamera.org/show_bug.cgi?id=201
> Signed-off-by: Nicolas Dufresne <nicolas.dufresne at collabora.com>
This all looks reasonable to me - and following the bugzilla I see the
comment : https://bugs.libcamera.org/show_bug.cgi?id=201#c10 which says
it improved things, but that the issue might still persist (which could
be a separate race/issue still?)
Nicolas, what are your thoughts - do you think this patch is ready for
merge already?
For the code itself:
Reviewed-by: Kieran Bingham <kieran.bingham at ideasonboard.com>
> ---
> src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++-------
> 1 file changed, 31 insertions(+), 9 deletions(-)
>
> diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp
> index 9661c67a..0b1a5689 100644
> --- a/src/gstreamer/gstlibcamerapool.cpp
> +++ b/src/gstreamer/gstlibcamerapool.cpp
> @@ -8,6 +8,7 @@
>
> #include "gstlibcamerapool.h"
>
> +#include <deque>
> #include <libcamera/stream.h>
>
> #include "gstlibcamera-utils.h"
> @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS];
> struct _GstLibcameraPool {
> GstBufferPool parent;
>
> - GstAtomicQueue *queue;
> + std::deque<GstBuffer *> *queue;
> GstLibcameraAllocator *allocator;
> Stream *stream;
> };
>
> G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL)
>
> +static GstBuffer *
> +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self)
> +{
> + GLibLocker lock(GST_OBJECT(self));
> + GstBuffer *buf;
> +
> + if (self->queue->empty())
> + return nullptr;
> +
> + buf = self->queue->front();
> + self->queue->pop_front();
> +
> + return buf;
> +}
> +
> static GstFlowReturn
> gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer,
> [[maybe_unused]] GstBufferPoolAcquireParams *params)
> {
> GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> - GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue));
> + GstBuffer *buf = gst_libcamera_pool_pop_buffer(self);
> +
> if (!buf)
> return GST_FLOW_ERROR;
>
> if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) {
> - gst_atomic_queue_push(self->queue, buf);
> + GLibLocker lock(GST_OBJECT(self));
> + self->queue->push_back(buf);
> return GST_FLOW_ERROR;
> }
>
> @@ -64,9 +82,13 @@ static void
> gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
> {
> GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> - bool do_notify = gst_atomic_queue_length(self->queue) == 0;
> + bool do_notify;
>
> - gst_atomic_queue_push(self->queue, buffer);
> + {
> + GLibLocker lock(GST_OBJECT(self));
> + do_notify = self->queue->empty();
> + self->queue->push_back(buffer);
> + }
>
> if (do_notify)
> g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0);
> @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
> static void
> gst_libcamera_pool_init(GstLibcameraPool *self)
> {
> - self->queue = gst_atomic_queue_new(4);
> + self->queue = new std::deque<GstBuffer *>();
> }
>
> static void
> @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object)
> GstLibcameraPool *self = GST_LIBCAMERA_POOL(object);
> GstBuffer *buf;
>
> - while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue))))
> + while ((buf = gst_libcamera_pool_pop_buffer(self)))
> gst_buffer_unref(buf);
>
> - gst_atomic_queue_unref(self->queue);
> + delete self->queue;
> g_object_unref(self->allocator);
>
> G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object);
> @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream)
> gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream);
> for (gsize i = 0; i < pool_size; i++) {
> GstBuffer *buffer = gst_buffer_new();
> - gst_atomic_queue_push(pool->queue, buffer);
> + pool->queue->push_back(buffer);
> }
>
> return pool;
> --
> 2.45.1
>
More information about the libcamera-devel
mailing list