[libcamera-devel] [PATCH 13/13] gstreamer: Fix race conditions in task pause/resume

Nicolas Dufresne nicolas.dufresne at collabora.com
Tue Jun 28 15:50:41 CEST 2022


Hi Laurent,

this one is difficult to follow, see some idea below.

Le vendredi 24 juin 2022 à 02:22 +0300, Laurent Pinchart a écrit :
> The task run function races with two other threads that want to resume
> the task: the requestCompleted() handler and the buffer-notify signal
> handler. If the former queues completed requests or the latter queues
> back buffers to the pool, and then resume the task, after the task run
> handler checks the queues but before it attemps to pause the task, then
> the task may be paused without noticing that more work is available.
> 
> The most immediate way to fix this is to take the stream_lock in the
> requestCompleted() and buffer-notify signal handlers, or cover the whole
> task run handler with the GstLibcameraSrcState lock. This could cause
> long delays in the requestCompleted() handler, so that's not a good
> option.
> 
> Instead, add a wakeup flag, preotected by the GstLibcameraSrcState lock,
> that allows detection of a lost race, and retry the task run.
> 
> Signed-off-by: Laurent Pinchart <laurent.pinchart at ideasonboard.com>
> ---
>  src/gstreamer/gstlibcamerasrc.cpp | 82 +++++++++++++++++++++++--------
>  1 file changed, 62 insertions(+), 20 deletions(-)
> 
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index 3feb87254916..59400f17ae85 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -120,6 +120,7 @@ struct GstLibcameraSrcState {
>  		LIBCAMERA_TSA_GUARDED_BY(lock_);
>  	std::queue<std::unique_ptr<RequestWrap>> completedRequests_
>  		LIBCAMERA_TSA_GUARDED_BY(lock_);
> +	bool wakeup_ LIBCAMERA_TSA_GUARDED_BY(lock_);
>  
>  	guint group_id_;
>  
> @@ -237,14 +238,16 @@ GstLibcameraSrcState::requestCompleted(Request *request)
>  	{
>  		MutexLocker locker(lock_);
>  		completedRequests_.push(std::move(wrap));
> -	}
> +		wakeup_ = true;
>  
> -	gst_task_resume(src_->task);
> +		gst_task_resume(src_->task);
> +	}
>  }
>  
>  int GstLibcameraSrcState::processRequest()
>  {
>  	std::unique_ptr<RequestWrap> wrap;
> +	int err = 0;
>  
>  	{
>  		MutexLocker locker(lock_);
> @@ -253,10 +256,13 @@ int GstLibcameraSrcState::processRequest()
>  			wrap = std::move(completedRequests_.front());
>  			completedRequests_.pop();
>  		}
> +
> +		if (completedRequests_.empty())
> +			err = -ENOBUFS;
>  	}
>  
>  	if (!wrap)
> -		return -ENODATA;
> +		return -ENOBUFS;
>  
>  	GstFlowReturn ret = GST_FLOW_OK;
>  	gst_flow_combiner_reset(src_->flow_combiner);
> @@ -296,7 +302,7 @@ int GstLibcameraSrcState::processRequest()
>  		return -EPIPE;
>  	}
>  
> -	return 0;
> +	return err;
>  }
>  
>  static bool
> @@ -360,53 +366,88 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
>  	return true;
>  }
>  
> +static void
> +gst_libcamera_src_task_resume(gpointer user_data)
> +{
> +	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> +	GstLibcameraSrcState *state = self->state;
> +
> +	MutexLocker locker(state->lock_);
> +	state->wakeup_ = true;
> +	gst_task_resume(self->task);
> +}
> +
>  static void
>  gst_libcamera_src_task_run(gpointer user_data)
>  {
>  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
>  	GstLibcameraSrcState *state = self->state;
>  
> +	{
> +		MutexLocker locker(state->lock_);
> +		state->wakeup_ = true;
> +	}
> +
> +	bool doPause = true;
> +
>  	/*
>  	 * Create and queue one request. If no buffers are available the
>  	 * function returns -ENOBUFS, which we ignore here as that's not a
>  	 * fatal error.
>  	 */
>  	int ret = state->queueRequest();
> -	if (ret == -ENOMEM) {
> +	switch (ret) {
> +	case 0:
> +		/*
> +		 * The request was successfully queued, there may be enough
> +		 * buffers to create a new one. Don't pause the task to give it
> +		 * another try.
> +		 */
> +		doPause = false;
> +		break;
> +
> +	case -ENOMEM:
>  		GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
>  				  ("Failed to allocate request for camera '%s'.",
>  				   state->cam_->id().c_str()),
>  				  ("libcamera::Camera::createRequest() failed"));
>  		gst_task_stop(self->task);
>  		return;
> +
> +	case -ENOBUFS:
> +	default:
> +		break;
>  	}
>  
> -	/* Process one completed request, if available. */
> +	/*
> +	 * Process one completed request, if available, and record if further
> +	 * requests are ready for processing.
> +	 */
>  	ret = state->processRequest();
>  	switch (ret) {
> +	case -ENOBUFS:
> +		doPause = false;
> +		break;
> +
>  	case -EPIPE:
>  		gst_task_stop(self->task);
>  		return;
>  
> -	case -ENODATA:
> -		gst_task_pause(self->task);
> -		return;
> +	case 0:
> +	default:
> +		break;
>  	}
>  
>  	/*
> -	 * Here we need to decide if we want to pause. This needs to
> -	 * happen in lock step with the callback thread which may want
> -	 * to resume the task and might push pending buffers.
> +	 * Here we need to decide if we want to pause. This needs to happen in
> +	 * lock step with the requestCompleted callback and the buffer-notify
> +	 * signal handler that resume the task.
>  	 */
> -	bool do_pause;
> -
> -	{
> +	if (doPause) {
>  		MutexLocker locker(state->lock_);
> -		do_pause = state->completedRequests_.empty();
> +		if (!state->wakeup_)
> +			gst_task_pause(self->task);

This dance is difficult to follow. Perhaps we could start the run function by
"pausing" the task? Then we will resume the task if state->processRequest() ==
0, or concurrently if a request completed. Using a doResume bool instead of
doPause.

>  	}
> -
> -	if (do_pause)
> -		gst_task_pause(self->task);
>  }
>  
>  static void
> @@ -517,7 +558,8 @@ gst_libcamera_src_task_enter(GstTask *task, [[maybe_unused]] GThread *thread,
>  		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
>  								stream_cfg.stream());
>  		g_signal_connect_swapped(pool, "buffer-notify",
> -					 G_CALLBACK(gst_task_resume), task);
> +					 G_CALLBACK(gst_libcamera_src_task_resume),
> +					 self);
>  
>  		gst_libcamera_pad_set_pool(srcpad, pool);
>  		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);



More information about the libcamera-devel mailing list