<div dir='auto'><div><br><div class="gmail_extra"><br><div class="gmail_quote">Le 1 juill. 2022 03 h 02, Umang Jain <umang.jain@ideasonboard.com> a écrit :<br type="attribution"><blockquote class="quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"><p dir="ltr">Hi Nicolas,
<br>
<br>
On 7/1/22 01:11, Nicolas Dufresne wrote:
<br>
> Le jeudi 30 juin 2022 à 13:48 +0300, Laurent Pinchart a écrit :
<br>
>> On Thu, Jun 30, 2022 at 03:31:57PM +0530, Umang Jain wrote:
<br>
>>> Hi Laurent,
<br>
>>>
<br>
>>> (still working through this patch)...
<br>
>>>
<br>
>>> On 6/30/22 05:32, Laurent Pinchart via libcamera-devel wrote:
<br>
>>>> The task run function races with two other threads that want to resume
<br>
>>>> the task: the requestCompleted() handler and the buffer-notify signal
<br>
>>> What do you mean by "two other threads" ?
<br>
>>>
<br>
>>> I think the "buffer-notify" signal is emitted in the task-thread itself.
<br>
>>> Are gst_libcamera_src_task_enter() and the task function itself (i.e.
<br>
>>> gst_libcamera_src_task_run()) are different threads? I think it's the
<br>
>>> same thread.
<br>
>> The buffer-notify signal is emitted by
<br>
>> gst_libcamera_pool_release_buffer(), which is called by GStreamer from a
<br>
>> different thread.
<br>
> Its more complex then this of course, since its GStreamer. But the buffer-notify
<br>
> is called from a streaming thread, so it may be re-entrant to the gst_pad_push()
<br>
> call, or its from another thread. This callback holds one of more stream lock,
<br>
> including ours, though stream locks are recursive for that reason.
<br>
<br>
<br>
Ok - then I need to see 'where' is this different thread coming from.
<br>
<br>
To me, "buffer-notify" is something we (as in GstLibcameraPool) define
<br>
and a GstLibcameraPool is created inside gst_libcamera_src_task_enter()
<br>
- signals
<br>
and callbacks are attached here ... so it does give an "impression" that
<br>
it would be
<br>
the same thread as the task function's one.
<br>
<br>
So, something for me to go through once I am have time...</p></blockquote></div></div></div><div dir="auto"><br></div><div dir="auto">In GStreamer, buffer ownership is always given away to the next element in the directed graph. The buffer (and it's memory objects) are refcounted. Pools (the buffer pool and our implementation allocator is also a pool) will be notified right before these are dropped by downstream element (when refcount is reaching 0), so they can be reused. </div><div dir="auto"><br></div><div dir="auto">In libcamerasrc design, when more free buffers are available, we need to resume the task if needed, as this may need to issue more requests. The callback is thus emitted through a buffer downstream being unreffed. We don't control which thread will drop the last ref, this is why we wake our task, so that further request happens from our thread.</div><div dir="auto"><br></div><div dir="auto">Sending request from random thread would create uncontrollable contention, and likely unsolvable deadlock situation.</div><div dir="auto"><br></div><div dir="auto"><div class="gmail_extra"><div class="gmail_quote"><blockquote class="quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"><p dir="ltr">
<br>
<br>
>
<br>
>>> If my understanding above is correct, the resume of task races with
<br>
>>> requestCompleted() handler thread only.
<br>
>>>
<br>
>>> Ofcourse, I need to see how the task_resume is affecting the task_run
<br>
>>> from different threads but I am still trying to grasp the issue.
<br>
>>>
<br>
>>>> handler. If the former queues completed requests or the latter queues
<br>
>>>> back buffers to the pool, and then resume the task, after the task run
<br>
>>>> handler checks the queues but before it attemps to pause the task, then
<br>
>>>> the task may be paused without noticing that more work is available.
<br>
>>>>
<br>
>>>> The most immediate way to fix this is to take the stream_lock in the
<br>
>>>> requestCompleted() and buffer-notify signal handlers, or cover the whole
<br>
>>>> task run handler with the GstLibcameraSrcState lock. This could cause
<br>
>>>> long delays in the requestCompleted() handler, so that's not a good
<br>
>>>> option.
<br>
>>>>
<br>
>>>> Instead, add a wakeup flag, preotected by the GstLibcameraSrcState lock,
<br>
>>>> that allows detection of a lost race, and retry the task run.
<br>
>>>>
<br>
>>>> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
<br>
>>>> ---
<br>
>>>> Changes since v1:
<br>
>>>>
<br>
>>>> - Fix incorrect wakeup and pause logic
<br>
>>>> ---
<br>
>>>> src/gstreamer/gstlibcamerasrc.cpp | 84 +++++++++++++++++++++++--------
<br>
>>>> 1 file changed, 63 insertions(+), 21 deletions(-)
<br>
>>>>
<br>
>>>> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
<br>
>>>> index 9ea59631a9f2..5471ab951252 100644
<br>
>>>> --- a/src/gstreamer/gstlibcamerasrc.cpp
<br>
>>>> +++ b/src/gstreamer/gstlibcamerasrc.cpp
<br>
>>>> @@ -118,7 +118,7 @@ struct GstLibcameraSrcState {
<br>
>>>> /*
<br>
>>>> * Contention on this lock_ must be minimized, as it has to be taken in
<br>
>>>> * the realtime-sensitive requestCompleted() handler to protect
<br>
>>>> - * queuedRequests_ and completedRequests_.
<br>
>>>> + * queuedRequests_, completedRequests_ and wakeup_.
<br>
>>>> *
<br>
>>>> * stream_lock must be taken before lock_ in contexts where both locks
<br>
>>>> * need to be taken. In particular, this means that the lock_ must not
<br>
>>>> @@ -130,6 +130,7 @@ struct GstLibcameraSrcState {
<br>
>>>> LIBCAMERA_TSA_GUARDED_BY(lock_);
<br>
>>>> std::queue<std::unique_ptr<RequestWrap>> completedRequests_
<br>
>>>> LIBCAMERA_TSA_GUARDED_BY(lock_);
<br>
>>>> + bool wakeup_ LIBCAMERA_TSA_GUARDED_BY(lock_);
<br>
>>>>
<br>
>>>> guint group_id_;
<br>
>>>>
<br>
>>>> @@ -250,15 +251,17 @@ GstLibcameraSrcState::requestCompleted(Request *request)
<br>
>>>> {
<br>
>>>> MutexLocker locker(lock_);
<br>
>>>> completedRequests_.push(std::move(wrap));
<br>
>>>> - }
<br>
>>>> + wakeup_ = true;
<br>
>>>>
<br>
>>>> - gst_task_resume(src_->task);
<br>
>>>> + gst_task_resume(src_->task);
<br>
>>>> + }
<br>
>>>> }
<br>
>>>>
<br>
>>>> /* Must be called with stream_lock held. */
<br>
>>>> int GstLibcameraSrcState::processRequest()
<br>
>>>> {
<br>
>>>> std::unique_ptr<RequestWrap> wrap;
<br>
>>>> + int err = 0;
<br>
>>>>
<br>
>>>> {
<br>
>>>> MutexLocker locker(lock_);
<br>
>>>> @@ -267,10 +270,13 @@ int GstLibcameraSrcState::processRequest()
<br>
>>>> wrap = std::move(completedRequests_.front());
<br>
>>>> completedRequests_.pop();
<br>
>>>> }
<br>
>>>> +
<br>
>>>> + if (completedRequests_.empty())
<br>
>>>> + err = -ENOBUFS;
<br>
>>>> }
<br>
>>>>
<br>
>>>> if (!wrap)
<br>
>>>> - return -ENODATA;
<br>
>>>> + return -ENOBUFS;
<br>
>>>>
<br>
>>>> GstFlowReturn ret = GST_FLOW_OK;
<br>
>>>> gst_flow_combiner_reset(src_->flow_combiner);
<br>
>>>> @@ -310,7 +316,7 @@ int GstLibcameraSrcState::processRequest()
<br>
>>>> return -EPIPE;
<br>
>>>> }
<br>
>>>>
<br>
>>>> - return 0;
<br>
>>>> + return err;
<br>
>>>> }
<br>
>>>>
<br>
>>>> static bool
<br>
>>>> @@ -374,53 +380,88 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
<br>
>>>> return true;
<br>
>>>> }
<br>
>>>>
<br>
>>>> +static void
<br>
>>>> +gst_libcamera_src_task_resume(gpointer user_data)
<br>
>>>> +{
<br>
>>>> + GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
<br>
>>>> + GstLibcameraSrcState *state = self->state;
<br>
>>>> +
<br>
>>>> + MutexLocker locker(state->lock_);
<br>
>>>> + state->wakeup_ = true;
<br>
>>>> + gst_task_resume(self->task);
<br>
>>>> +}
<br>
>>>> +
<br>
>>>> static void
<br>
>>>> gst_libcamera_src_task_run(gpointer user_data)
<br>
>>>> {
<br>
>>>> GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
<br>
>>>> GstLibcameraSrcState *state = self->state;
<br>
>>>>
<br>
>>>> + {
<br>
>>>> + MutexLocker locker(state->lock_);
<br>
>>>> + state->wakeup_ = false;
<br>
>>>> + }
<br>
>>>> +
<br>
>>>> + bool doPause = true;
<br>
>>>> +
<br>
>>>> /*
<br>
>>>> * Create and queue one request. If no buffers are available the
<br>
>>>> * function returns -ENOBUFS, which we ignore here as that's not a
<br>
>>>> * fatal error.
<br>
>>>> */
<br>
>>>> int ret = state->queueRequest();
<br>
>>>> - if (ret == -ENOMEM) {
<br>
>>>> + switch (ret) {
<br>
>>>> + case 0:
<br>
>>>> + /*
<br>
>>>> + * The request was successfully queued, there may be enough
<br>
>>>> + * buffers to create a new one. Don't pause the task to give it
<br>
>>>> + * another try.
<br>
>>>> + */
<br>
>>>> + doPause = false;
<br>
>>>> + break;
<br>
>>>> +
<br>
>>>> + case -ENOMEM:
<br>
>>>> GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
<br>
>>>> ("Failed to allocate request for camera '%s'.",
<br>
>>>> state->cam_->id().c_str()),
<br>
>>>> ("libcamera::Camera::createRequest() failed"));
<br>
>>>> gst_task_stop(self->task);
<br>
>>>> return;
<br>
>>>> +
<br>
>>>> + case -ENOBUFS:
<br>
>>>> + default:
<br>
>>>> + break;
<br>
>>>> }
<br>
>>>>
<br>
>>>> - /* Process one completed request, if available. */
<br>
>>>> + /*
<br>
>>>> + * Process one completed request, if available, and record if further
<br>
>>>> + * requests are ready for processing.
<br>
>>>> + */
<br>
>>>> ret = state->processRequest();
<br>
>>>> switch (ret) {
<br>
>>>> + case -ENOBUFS:
<br>
>>>> + doPause = false;
<br>
>>>> + break;
<br>
>>>> +
<br>
>>>> case -EPIPE:
<br>
>>>> gst_task_stop(self->task);
<br>
>>>> return;
<br>
>>>>
<br>
>>>> - case -ENODATA:
<br>
>>>> - gst_task_pause(self->task);
<br>
>>>> - return;
<br>
>>>> + case 0:
<br>
>>>> + default:
<br>
>>>> + break;
<br>
>>>> }
<br>
>>>>
<br>
>>>> /*
<br>
>>>> - * Here we need to decide if we want to pause. This needs to
<br>
>>>> - * happen in lock step with the callback thread which may want
<br>
>>>> - * to resume the task and might push pending buffers.
<br>
>>>> + * Here we need to decide if we want to pause. This needs to happen in
<br>
>>>> + * lock step with the requestCompleted callback and the buffer-notify
<br>
>>>> + * signal handler that resume the task.
<br>
>>>> */
<br>
>>>> - bool do_pause;
<br>
>>>> -
<br>
>>>> - {
<br>
>>>> + if (doPause) {
<br>
>>>> MutexLocker locker(state->lock_);
<br>
>>>> - do_pause = state->completedRequests_.empty();
<br>
>>>> + if (!state->wakeup_)
<br>
>>>> + gst_task_pause(self->task);
<br>
>>>> }
<br>
>>>> -
<br>
>>>> - if (do_pause)
<br>
>>>> - gst_task_pause(self->task);
<br>
>>>> }
<br>
>>>>
<br>
>>>> static void
<br>
>>>> @@ -531,7 +572,8 @@ gst_libcamera_src_task_enter(GstTask *task, [[maybe_unused]] GThread *thread,
<br>
>>>> GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
<br>
>>>> stream_cfg.stream());
<br>
>>>> g_signal_connect_swapped(pool, "buffer-notify",
<br>
>>>> - G_CALLBACK(gst_task_resume), task);
<br>
>>>> + G_CALLBACK(gst_libcamera_src_task_resume),
<br>
>>>> + self);
<br>
>>>>
<br>
>>>> gst_libcamera_pad_set_pool(srcpad, pool);
<br>
>>>> gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
<br>
</p>
</blockquote></div><br></div></div></div>