[libcamera-devel] [PATCH 13/13] gstreamer: Fix race conditions in task pause/resume

Laurent Pinchart laurent.pinchart at ideasonboard.com
Thu Jun 30 01:50:09 CEST 2022


Hi Nicolas,

On Tue, Jun 28, 2022 at 09:50:41AM -0400, Nicolas Dufresne wrote:
> Hi Laurent,
> 
> this one is difficult to follow, see some idea below.

That matches the experience I had writing it :-)

> Le vendredi 24 juin 2022 à 02:22 +0300, Laurent Pinchart a écrit :
> > The task run function races with two other threads that want to resume
> > the task: the requestCompleted() handler and the buffer-notify signal
> > handler. If the former queues completed requests or the latter queues
> > back buffers to the pool, and then resume the task, after the task run
> > handler checks the queues but before it attemps to pause the task, then
> > the task may be paused without noticing that more work is available.
> > 
> > The most immediate way to fix this is to take the stream_lock in the
> > requestCompleted() and buffer-notify signal handlers, or cover the whole
> > task run handler with the GstLibcameraSrcState lock. This could cause
> > long delays in the requestCompleted() handler, so that's not a good
> > option.
> > 
> > Instead, add a wakeup flag, preotected by the GstLibcameraSrcState lock,
> > that allows detection of a lost race, and retry the task run.
> > 
> > Signed-off-by: Laurent Pinchart <laurent.pinchart at ideasonboard.com>
> > ---
> >  src/gstreamer/gstlibcamerasrc.cpp | 82 +++++++++++++++++++++++--------
> >  1 file changed, 62 insertions(+), 20 deletions(-)
> > 
> > diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> > index 3feb87254916..59400f17ae85 100644
> > --- a/src/gstreamer/gstlibcamerasrc.cpp
> > +++ b/src/gstreamer/gstlibcamerasrc.cpp
> > @@ -120,6 +120,7 @@ struct GstLibcameraSrcState {
> >  		LIBCAMERA_TSA_GUARDED_BY(lock_);
> >  	std::queue<std::unique_ptr<RequestWrap>> completedRequests_
> >  		LIBCAMERA_TSA_GUARDED_BY(lock_);
> > +	bool wakeup_ LIBCAMERA_TSA_GUARDED_BY(lock_);
> >  
> >  	guint group_id_;
> >  
> > @@ -237,14 +238,16 @@ GstLibcameraSrcState::requestCompleted(Request *request)
> >  	{
> >  		MutexLocker locker(lock_);
> >  		completedRequests_.push(std::move(wrap));
> > -	}
> > +		wakeup_ = true;
> >  
> > -	gst_task_resume(src_->task);
> > +		gst_task_resume(src_->task);
> > +	}
> >  }
> >  
> >  int GstLibcameraSrcState::processRequest()
> >  {
> >  	std::unique_ptr<RequestWrap> wrap;
> > +	int err = 0;
> >  
> >  	{
> >  		MutexLocker locker(lock_);
> > @@ -253,10 +256,13 @@ int GstLibcameraSrcState::processRequest()
> >  			wrap = std::move(completedRequests_.front());
> >  			completedRequests_.pop();
> >  		}
> > +
> > +		if (completedRequests_.empty())
> > +			err = -ENOBUFS;
> >  	}
> >  
> >  	if (!wrap)
> > -		return -ENODATA;
> > +		return -ENOBUFS;
> >  
> >  	GstFlowReturn ret = GST_FLOW_OK;
> >  	gst_flow_combiner_reset(src_->flow_combiner);
> > @@ -296,7 +302,7 @@ int GstLibcameraSrcState::processRequest()
> >  		return -EPIPE;
> >  	}
> >  
> > -	return 0;
> > +	return err;
> >  }
> >  
> >  static bool
> > @@ -360,53 +366,88 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
> >  	return true;
> >  }
> >  
> > +static void
> > +gst_libcamera_src_task_resume(gpointer user_data)
> > +{
> > +	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> > +	GstLibcameraSrcState *state = self->state;
> > +
> > +	MutexLocker locker(state->lock_);
> > +	state->wakeup_ = true;
> > +	gst_task_resume(self->task);
> > +}
> > +
> >  static void
> >  gst_libcamera_src_task_run(gpointer user_data)
> >  {
> >  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> >  	GstLibcameraSrcState *state = self->state;
> >  
> > +	{
> > +		MutexLocker locker(state->lock_);
> > +		state->wakeup_ = true;

Looks like there's a bug here, wakeup_ needs to be set to false. It's
always true otherwise.

> > +	}
> > +
> > +	bool doPause = true;
> > +
> >  	/*
> >  	 * Create and queue one request. If no buffers are available the
> >  	 * function returns -ENOBUFS, which we ignore here as that's not a
> >  	 * fatal error.
> >  	 */
> >  	int ret = state->queueRequest();
> > -	if (ret == -ENOMEM) {
> > +	switch (ret) {
> > +	case 0:
> > +		/*
> > +		 * The request was successfully queued, there may be enough
> > +		 * buffers to create a new one. Don't pause the task to give it
> > +		 * another try.
> > +		 */
> > +		doPause = false;
> > +		break;
> > +
> > +	case -ENOMEM:
> >  		GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
> >  				  ("Failed to allocate request for camera '%s'.",
> >  				   state->cam_->id().c_str()),
> >  				  ("libcamera::Camera::createRequest() failed"));
> >  		gst_task_stop(self->task);
> >  		return;
> > +
> > +	case -ENOBUFS:
> > +	default:
> > +		break;
> >  	}
> >  
> > -	/* Process one completed request, if available. */
> > +	/*
> > +	 * Process one completed request, if available, and record if further
> > +	 * requests are ready for processing.
> > +	 */
> >  	ret = state->processRequest();
> >  	switch (ret) {
> > +	case -ENOBUFS:
> > +		doPause = false;

And here too, this needs to go to case 0.

> > +		break;
> > +
> >  	case -EPIPE:
> >  		gst_task_stop(self->task);
> >  		return;
> >  
> > -	case -ENODATA:
> > -		gst_task_pause(self->task);
> > -		return;
> > +	case 0:
> > +	default:
> > +		break;
> >  	}
> >  
> >  	/*
> > -	 * Here we need to decide if we want to pause. This needs to
> > -	 * happen in lock step with the callback thread which may want
> > -	 * to resume the task and might push pending buffers.
> > +	 * Here we need to decide if we want to pause. This needs to happen in
> > +	 * lock step with the requestCompleted callback and the buffer-notify
> > +	 * signal handler that resume the task.
> >  	 */
> > -	bool do_pause;
> > -
> > -	{
> > +	if (doPause) {
> >  		MutexLocker locker(state->lock_);
> > -		do_pause = state->completedRequests_.empty();
> > +		if (!state->wakeup_)
> > +			gst_task_pause(self->task);
> 
> This dance is difficult to follow. Perhaps we could start the run function by
> "pausing" the task? Then we will resume the task if state->processRequest() ==
> 0, or concurrently if a request completed. Using a doResume bool instead of
> doPause.

I've given it a try, and the result is as follows:

diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index 6ee315cf5efe..324ea457eb4f 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -411,7 +411,9 @@ gst_libcamera_src_task_run(gpointer user_data)
 		state->wakeup_ = false;
 	}
 
-	bool doPause = true;
+	bool doResume = false;
+
+	gst_task_pause(self->task);
 
 	/*
 	 * Create and queue one request. If no buffers are available the
@@ -426,7 +428,7 @@ gst_libcamera_src_task_run(gpointer user_data)
 		 * buffers to create a new one. Don't pause the task to give it
 		 * another try.
 		 */
-		doPause = false;
+		doResume = true;
 		break;
 
 	case -ENOMEM:
@@ -449,11 +451,8 @@ gst_libcamera_src_task_run(gpointer user_data)
 	ret = state->processRequest();
 	switch (ret) {
 	case 0:
-		/*
-		 * Another completed request is available, don't pause the
-		 * task.
-		 */
-		doPause = false;
+		/* Another completed request is available, resume the task. */
+		doResume = true;
 		break;
 
 	case -EPIPE:
@@ -466,13 +466,13 @@ gst_libcamera_src_task_run(gpointer user_data)
 	}
 
 	/*
-	 * Here we need to decide if we want to pause. This needs to happen in
+	 * Here we need to decide if we want to resume. This needs to happen in
 	 * lock step with the requestCompleted callback and the buffer-notify
 	 * signal handler that resume the task.
 	 */
-	if (doPause) {
+	{
 		MutexLocker locker(state->lock_);
-		if (!state->wakeup_)
-			gst_task_pause(self->task);
+		if (doResume || state->wakeup_)
+			gst_task_resume(self->task);
 	}
 }

I'm not sure that's what you meant, as it doesn't seem much easier to
follow.

With the mechanism from v1 (and the two fixes mentioned above), the task
runs ~80 times / second, is resumed ~55 times / second, and paused ~26
times / second. With the opposite logic, we get ~80 pause/second (that's
expected, as it's paused at every run) and ~107 resume/second. That
seems less efficient to me.

I'll use the original mechanism with the bug fixes in v2, if you meant
something else than the above, you can explain it in a reply to the new
patch.

> >  	}
> > -
> > -	if (do_pause)
> > -		gst_task_pause(self->task);
> >  }
> >  
> >  static void
> > @@ -517,7 +558,8 @@ gst_libcamera_src_task_enter(GstTask *task, [[maybe_unused]] GThread *thread,
> >  		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
> >  								stream_cfg.stream());
> >  		g_signal_connect_swapped(pool, "buffer-notify",
> > -					 G_CALLBACK(gst_task_resume), task);
> > +					 G_CALLBACK(gst_libcamera_src_task_resume),
> > +					 self);
> >  
> >  		gst_libcamera_pad_set_pool(srcpad, pool);
> >  		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);

-- 
Regards,

Laurent Pinchart


More information about the libcamera-devel mailing list