[libcamera-devel] [PATCH v2 12/12] gstreamer: Fix race conditions in task pause/resume
Umang Jain
umang.jain at ideasonboard.com
Thu Jun 30 12:01:57 CEST 2022
Hi Laurent,
(still working through this patch)...
On 6/30/22 05:32, Laurent Pinchart via libcamera-devel wrote:
> The task run function races with two other threads that want to resume
> the task: the requestCompleted() handler and the buffer-notify signal
What do you mean by "two other threads" ?
I think the "buffer-notify" signal is emitted in the task-thread itself.
Are gst_libcamera_src_task_enter() and the task function itself (i.e.
gst_libcamera_src_task_run()) are different threads? I think it's the
same thread.
If my understanding above is correct, the resume of task races with
requestCompleted() handler thread only.
Ofcourse, I need to see how the task_resume is affecting the task_run
from different threads but I am still trying to grasp the issue.
> handler. If the former queues completed requests or the latter queues
> back buffers to the pool, and then resume the task, after the task run
> handler checks the queues but before it attemps to pause the task, then
> the task may be paused without noticing that more work is available.
>
> The most immediate way to fix this is to take the stream_lock in the
> requestCompleted() and buffer-notify signal handlers, or cover the whole
> task run handler with the GstLibcameraSrcState lock. This could cause
> long delays in the requestCompleted() handler, so that's not a good
> option.
>
> Instead, add a wakeup flag, preotected by the GstLibcameraSrcState lock,
> that allows detection of a lost race, and retry the task run.
>
> Signed-off-by: Laurent Pinchart <laurent.pinchart at ideasonboard.com>
> ---
> Changes since v1:
>
> - Fix incorrect wakeup and pause logic
> ---
> src/gstreamer/gstlibcamerasrc.cpp | 84 +++++++++++++++++++++++--------
> 1 file changed, 63 insertions(+), 21 deletions(-)
>
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index 9ea59631a9f2..5471ab951252 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -118,7 +118,7 @@ struct GstLibcameraSrcState {
> /*
> * Contention on this lock_ must be minimized, as it has to be taken in
> * the realtime-sensitive requestCompleted() handler to protect
> - * queuedRequests_ and completedRequests_.
> + * queuedRequests_, completedRequests_ and wakeup_.
> *
> * stream_lock must be taken before lock_ in contexts where both locks
> * need to be taken. In particular, this means that the lock_ must not
> @@ -130,6 +130,7 @@ struct GstLibcameraSrcState {
> LIBCAMERA_TSA_GUARDED_BY(lock_);
> std::queue<std::unique_ptr<RequestWrap>> completedRequests_
> LIBCAMERA_TSA_GUARDED_BY(lock_);
> + bool wakeup_ LIBCAMERA_TSA_GUARDED_BY(lock_);
>
> guint group_id_;
>
> @@ -250,15 +251,17 @@ GstLibcameraSrcState::requestCompleted(Request *request)
> {
> MutexLocker locker(lock_);
> completedRequests_.push(std::move(wrap));
> - }
> + wakeup_ = true;
>
> - gst_task_resume(src_->task);
> + gst_task_resume(src_->task);
> + }
> }
>
> /* Must be called with stream_lock held. */
> int GstLibcameraSrcState::processRequest()
> {
> std::unique_ptr<RequestWrap> wrap;
> + int err = 0;
>
> {
> MutexLocker locker(lock_);
> @@ -267,10 +270,13 @@ int GstLibcameraSrcState::processRequest()
> wrap = std::move(completedRequests_.front());
> completedRequests_.pop();
> }
> +
> + if (completedRequests_.empty())
> + err = -ENOBUFS;
> }
>
> if (!wrap)
> - return -ENODATA;
> + return -ENOBUFS;
>
> GstFlowReturn ret = GST_FLOW_OK;
> gst_flow_combiner_reset(src_->flow_combiner);
> @@ -310,7 +316,7 @@ int GstLibcameraSrcState::processRequest()
> return -EPIPE;
> }
>
> - return 0;
> + return err;
> }
>
> static bool
> @@ -374,53 +380,88 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
> return true;
> }
>
> +static void
> +gst_libcamera_src_task_resume(gpointer user_data)
> +{
> + GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> + GstLibcameraSrcState *state = self->state;
> +
> + MutexLocker locker(state->lock_);
> + state->wakeup_ = true;
> + gst_task_resume(self->task);
> +}
> +
> static void
> gst_libcamera_src_task_run(gpointer user_data)
> {
> GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> GstLibcameraSrcState *state = self->state;
>
> + {
> + MutexLocker locker(state->lock_);
> + state->wakeup_ = false;
> + }
> +
> + bool doPause = true;
> +
> /*
> * Create and queue one request. If no buffers are available the
> * function returns -ENOBUFS, which we ignore here as that's not a
> * fatal error.
> */
> int ret = state->queueRequest();
> - if (ret == -ENOMEM) {
> + switch (ret) {
> + case 0:
> + /*
> + * The request was successfully queued, there may be enough
> + * buffers to create a new one. Don't pause the task to give it
> + * another try.
> + */
> + doPause = false;
> + break;
> +
> + case -ENOMEM:
> GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
> ("Failed to allocate request for camera '%s'.",
> state->cam_->id().c_str()),
> ("libcamera::Camera::createRequest() failed"));
> gst_task_stop(self->task);
> return;
> +
> + case -ENOBUFS:
> + default:
> + break;
> }
>
> - /* Process one completed request, if available. */
> + /*
> + * Process one completed request, if available, and record if further
> + * requests are ready for processing.
> + */
> ret = state->processRequest();
> switch (ret) {
> + case -ENOBUFS:
> + doPause = false;
> + break;
> +
> case -EPIPE:
> gst_task_stop(self->task);
> return;
>
> - case -ENODATA:
> - gst_task_pause(self->task);
> - return;
> + case 0:
> + default:
> + break;
> }
>
> /*
> - * Here we need to decide if we want to pause. This needs to
> - * happen in lock step with the callback thread which may want
> - * to resume the task and might push pending buffers.
> + * Here we need to decide if we want to pause. This needs to happen in
> + * lock step with the requestCompleted callback and the buffer-notify
> + * signal handler that resume the task.
> */
> - bool do_pause;
> -
> - {
> + if (doPause) {
> MutexLocker locker(state->lock_);
> - do_pause = state->completedRequests_.empty();
> + if (!state->wakeup_)
> + gst_task_pause(self->task);
> }
> -
> - if (do_pause)
> - gst_task_pause(self->task);
> }
>
> static void
> @@ -531,7 +572,8 @@ gst_libcamera_src_task_enter(GstTask *task, [[maybe_unused]] GThread *thread,
> GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
> stream_cfg.stream());
> g_signal_connect_swapped(pool, "buffer-notify",
> - G_CALLBACK(gst_task_resume), task);
> + G_CALLBACK(gst_libcamera_src_task_resume),
> + self);
>
> gst_libcamera_pad_set_pool(srcpad, pool);
> gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
More information about the libcamera-devel
mailing list