[libcamera-devel] [PATCH v2 12/12] gstreamer: Fix race conditions in task pause/resume

Laurent Pinchart laurent.pinchart at ideasonboard.com
Thu Jun 30 02:02:51 CEST 2022


The task run function races with two other threads that want to resume
the task: the requestCompleted() handler and the buffer-notify signal
handler. If the former queues completed requests or the latter queues
back buffers to the pool, and then resume the task, after the task run
handler checks the queues but before it attemps to pause the task, then
the task may be paused without noticing that more work is available.

The most immediate way to fix this is to take the stream_lock in the
requestCompleted() and buffer-notify signal handlers, or cover the whole
task run handler with the GstLibcameraSrcState lock. This could cause
long delays in the requestCompleted() handler, so that's not a good
option.

Instead, add a wakeup flag, preotected by the GstLibcameraSrcState lock,
that allows detection of a lost race, and retry the task run.

Signed-off-by: Laurent Pinchart <laurent.pinchart at ideasonboard.com>
---
Changes since v1:

- Fix incorrect wakeup and pause logic
---
 src/gstreamer/gstlibcamerasrc.cpp | 84 +++++++++++++++++++++++--------
 1 file changed, 63 insertions(+), 21 deletions(-)

diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index 9ea59631a9f2..5471ab951252 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -118,7 +118,7 @@ struct GstLibcameraSrcState {
 	/*
 	 * Contention on this lock_ must be minimized, as it has to be taken in
 	 * the realtime-sensitive requestCompleted() handler to protect
-	 * queuedRequests_ and completedRequests_.
+	 * queuedRequests_, completedRequests_ and wakeup_.
 	 *
 	 * stream_lock must be taken before lock_ in contexts where both locks
 	 * need to be taken. In particular, this means that the lock_ must not
@@ -130,6 +130,7 @@ struct GstLibcameraSrcState {
 		LIBCAMERA_TSA_GUARDED_BY(lock_);
 	std::queue<std::unique_ptr<RequestWrap>> completedRequests_
 		LIBCAMERA_TSA_GUARDED_BY(lock_);
+	bool wakeup_ LIBCAMERA_TSA_GUARDED_BY(lock_);
 
 	guint group_id_;
 
@@ -250,15 +251,17 @@ GstLibcameraSrcState::requestCompleted(Request *request)
 	{
 		MutexLocker locker(lock_);
 		completedRequests_.push(std::move(wrap));
-	}
+		wakeup_ = true;
 
-	gst_task_resume(src_->task);
+		gst_task_resume(src_->task);
+	}
 }
 
 /* Must be called with stream_lock held. */
 int GstLibcameraSrcState::processRequest()
 {
 	std::unique_ptr<RequestWrap> wrap;
+	int err = 0;
 
 	{
 		MutexLocker locker(lock_);
@@ -267,10 +270,13 @@ int GstLibcameraSrcState::processRequest()
 			wrap = std::move(completedRequests_.front());
 			completedRequests_.pop();
 		}
+
+		if (completedRequests_.empty())
+			err = -ENOBUFS;
 	}
 
 	if (!wrap)
-		return -ENODATA;
+		return -ENOBUFS;
 
 	GstFlowReturn ret = GST_FLOW_OK;
 	gst_flow_combiner_reset(src_->flow_combiner);
@@ -310,7 +316,7 @@ int GstLibcameraSrcState::processRequest()
 		return -EPIPE;
 	}
 
-	return 0;
+	return err;
 }
 
 static bool
@@ -374,53 +380,88 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
 	return true;
 }
 
+static void
+gst_libcamera_src_task_resume(gpointer user_data)
+{
+	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
+	GstLibcameraSrcState *state = self->state;
+
+	MutexLocker locker(state->lock_);
+	state->wakeup_ = true;
+	gst_task_resume(self->task);
+}
+
 static void
 gst_libcamera_src_task_run(gpointer user_data)
 {
 	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
 	GstLibcameraSrcState *state = self->state;
 
+	{
+		MutexLocker locker(state->lock_);
+		state->wakeup_ = false;
+	}
+
+	bool doPause = true;
+
 	/*
 	 * Create and queue one request. If no buffers are available the
 	 * function returns -ENOBUFS, which we ignore here as that's not a
 	 * fatal error.
 	 */
 	int ret = state->queueRequest();
-	if (ret == -ENOMEM) {
+	switch (ret) {
+	case 0:
+		/*
+		 * The request was successfully queued, there may be enough
+		 * buffers to create a new one. Don't pause the task to give it
+		 * another try.
+		 */
+		doPause = false;
+		break;
+
+	case -ENOMEM:
 		GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
 				  ("Failed to allocate request for camera '%s'.",
 				   state->cam_->id().c_str()),
 				  ("libcamera::Camera::createRequest() failed"));
 		gst_task_stop(self->task);
 		return;
+
+	case -ENOBUFS:
+	default:
+		break;
 	}
 
-	/* Process one completed request, if available. */
+	/*
+	 * Process one completed request, if available, and record if further
+	 * requests are ready for processing.
+	 */
 	ret = state->processRequest();
 	switch (ret) {
+	case -ENOBUFS:
+		doPause = false;
+		break;
+
 	case -EPIPE:
 		gst_task_stop(self->task);
 		return;
 
-	case -ENODATA:
-		gst_task_pause(self->task);
-		return;
+	case 0:
+	default:
+		break;
 	}
 
 	/*
-	 * Here we need to decide if we want to pause. This needs to
-	 * happen in lock step with the callback thread which may want
-	 * to resume the task and might push pending buffers.
+	 * Here we need to decide if we want to pause. This needs to happen in
+	 * lock step with the requestCompleted callback and the buffer-notify
+	 * signal handler that resume the task.
 	 */
-	bool do_pause;
-
-	{
+	if (doPause) {
 		MutexLocker locker(state->lock_);
-		do_pause = state->completedRequests_.empty();
+		if (!state->wakeup_)
+			gst_task_pause(self->task);
 	}
-
-	if (do_pause)
-		gst_task_pause(self->task);
 }
 
 static void
@@ -531,7 +572,8 @@ gst_libcamera_src_task_enter(GstTask *task, [[maybe_unused]] GThread *thread,
 		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
 								stream_cfg.stream());
 		g_signal_connect_swapped(pool, "buffer-notify",
-					 G_CALLBACK(gst_task_resume), task);
+					 G_CALLBACK(gst_libcamera_src_task_resume),
+					 self);
 
 		gst_libcamera_pad_set_pool(srcpad, pool);
 		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
-- 
Regards,

Laurent Pinchart



More information about the libcamera-devel mailing list