[libcamera-devel] [PATCH v5 1/4] android: Notify post processing completion via a signal
Umang Jain
umang.jain at ideasonboard.com
Thu Oct 21 16:34:18 CEST 2021
Hi Laurent,
On 10/21/21 7:22 PM, Laurent Pinchart wrote:
> Hi Umang,
>
> On Thu, Oct 21, 2021 at 11:05:44AM +0530, Umang Jain wrote:
>> On 10/21/21 6:01 AM, Laurent Pinchart wrote:
>>> On Wed, Oct 20, 2021 at 04:12:09PM +0530, Umang Jain wrote:
>>>> Notify that the post processing for a request has been completed,
>>>> via a signal. A pointer to the descriptor which is tracking the
>>>> capture request is emitted along with the status of post processed
>>>> buffer. The function CameraDevice::streamProcessingComplete() will
>>>> finally set the status on the request descriptor and send capture
>>>> results back to the framework accordingly.
>>>>
>>>> We also need to save a pointer to any internal buffers that might have
>>>> been allocated by CameraStream. The buffer should be returned back to
>>>> CameraStream just before capture results are sent.
>>>>
>>>> A streamProcessMutex_ has been introduced here itself, which will be
>>> Did you mean s/itself/as well/ ?
>>>
>>>> applicable to guard access to descriptor->buffers_ when post-processing
>>>> is moved to be asynchronous in subsequent commits.
>>>>
>>>> Signed-off-by: Umang Jain <umang.jain at ideasonboard.com>
>>>> ---
>>>> src/android/camera_device.cpp | 92 +++++++++++++++++++++---
>>>> src/android/camera_device.h | 7 ++
>>>> src/android/camera_request.h | 4 ++
>>>> src/android/camera_stream.cpp | 13 ++++
>>>> src/android/jpeg/post_processor_jpeg.cpp | 2 +
>>>> src/android/post_processor.h | 9 +++
>>>> src/android/yuv/post_processor_yuv.cpp | 10 ++-
>>>> 7 files changed, 125 insertions(+), 12 deletions(-)
>>>>
>>>> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
>>>> index 806b4090..541c2c81 100644
>>>> --- a/src/android/camera_device.cpp
>>>> +++ b/src/android/camera_device.cpp
>>>> @@ -1117,6 +1117,15 @@ void CameraDevice::requestComplete(Request *request)
>>>> }
>>>>
>>>> /* Handle post-processing. */
>>>> + bool needsPostProcessing = false;
>>>> + Camera3RequestDescriptor::Status processingStatus =
>>>> + Camera3RequestDescriptor::Status::Pending;
>>>> + /*
>>>> + * \todo Apply streamsProcessMutex_ when post-processing is adapted to run
>>>> + * asynchronously. If we introduce the streamsProcessMutex_ right away, the
>>>> + * lock will be held forever since it is synchronous at this point
>>>> + * (see streamProcessingComplete).
>>>> + */
>>>> for (auto &buffer : descriptor->buffers_) {
>>>> CameraStream *stream = buffer.stream;
>>>>
>>>> @@ -1132,22 +1141,27 @@ void CameraDevice::requestComplete(Request *request)
>>>> continue;
>>>> }
>>>>
>>>> - int ret = stream->process(*src, buffer, descriptor);
>>>> -
>>>> - /*
>>>> - * Return the FrameBuffer to the CameraStream now that we're
>>>> - * done processing it.
>>>> - */
>>>> if (stream->type() == CameraStream::Type::Internal)
>>>> - stream->putBuffer(src);
>>>> + buffer.internalBuffer = src;
>>> Let's make the field name more self-explanatory. A possible candidate is
>>> sourceBuffer instead of internalBuffer.
>>>
>>> Could we do this in CameraDevice::processCaptureRequest(), just after
>>> calling cameraStream->getBuffer() ? I'll feel less worried about a leak
>>> if we store the buffer for later release right after acquiring it.
>> Ack.
>>
>>>>
>>>> + needsPostProcessing = true;
>>>> + int ret = stream->process(*src, buffer, descriptor);
>>>> if (ret) {
>>>> - buffer.status = Camera3RequestDescriptor::Status::Error;
>>>> - notifyError(descriptor->frameNumber_, stream->camera3Stream(),
>>>> - CAMERA3_MSG_ERROR_BUFFER);
>>>> + setBufferStatus(stream, buffer, descriptor,
>>>> + Camera3RequestDescriptor::Status::Error);
>>>> + processingStatus = Camera3RequestDescriptor::Status::Error;
>>>> }
>>> I think we need to improve error handling a little bit, in order to
>>> support multiple streams being post-processed. We need to complete the
>>> request in CameraDevice::streamProcessingComplete() when the last
>>> processing stream completes, so the number of post-processed streams
>>> need to be somehow recorded in the Camera3RequestDescriptor. It could be
>>> tempted to have a counter, but a
>> I have implemented a counter based solution, it still didn't address all
>> the corner cases. Since we have two path of error handling (synchronous
>> error and asynchronous error), a counter based implementation was
>> probably too verbose with much of code duplication to handle various
>> scenarios.
>>
>>> std::map<CameraStream *, StreamBuffer *> pendingPostProcess_;
>>>
>>> could be better, as it will also remove the need for loops in
>>> streamProcessingComplete() to lookup the buffer from the stream and to
>>> check if the request has been fully processed. Accessing the map has to
>>> be done with the mutex held to avoid race conditions, but it's a bit
>>> more tricky than that. If we just add elements to the map here, we could
>> Good, so you do understand so many looping lookups in
>> streamProcessingComplete.
>>
>> I broadly feel okay with std::map<CameraStream *, StreamBuffer *>
>> pendingPostProcess_; as long as it resides inside the descriptor. For
>> me, it avoids lookups, and can address the \todo in 3/4.
>>
>> Having said that, I feel a bit wary of introducing more data structures
>> for management. We already have a queue of post-processing (convert to
>> std:deque if we want to access the queued requests for post-processing)
>> and now we will have another map to tracking the requests. I wonder if
>> we can get away with a PostProcessorWorker's std::deque, with the
>> benefits of the map. But again, we have to take care of exposure / mutex
>> with main thread, which can lead to subtle bugs and unwanted waiting on
>> each thread
>>
>> Now I think about it, the pendingPostProcess_ map is about Stream's
>> pending processing _per descriptor_, whereas the PostProcessorWorker's
>> queue is replacement for Thread's message queue. Hmm.
> That's right, those two data structures are independent from each other.
>
>>> race with streamProcessingComplete() as it could be called for the first
>>> post-processed stream before we have a chance to add the second one
>>> here. streamProcessingComplete() would then incorrectly think it has
>>> completed the last stream, and proceed to complete the request.
>>>
>>> This race could be solved by holding the lock for the whole duration of
>>> the loop here, covering all calls to stream->process(), but it may be
>> Yes, I am considering this in current implementation too. The way it
>> will look like, is first queue up all the post-processing requests to
>> PostProcessorWoe, only then, allow any of the
>> slot(streamProcessingComplete) to get executed. A simple mutex hold can
>> do that
> I usually try to minimize the amount of code covered by locks, and focus
> on data access only if possible instead of keeping the lock held over
> the callbacks. Let's see if that's possible. There's a tricky part
> though, as sendCaptureResults() is called from two different threads
> (synchronously with request queuing in error paths and asynchronously in
> the completion path), we currently have a race condition.
>
> Thread A Thread B
> -----------------------------------------------------------------------------------------------------------
>
> MutexLocker lock(descriptorsMutex_);
> while (!descriptors_.empty() &&
> !descriptors_.front()->isPending()) {
> auto descriptor = std::move(descriptors_.front());
> descriptors_.pop();
>
> lock.unlock();
> MutexLocker lock(descriptorsMutex_);
> while (!descriptors_.empty() &&
> !descriptors_.front()->isPending()) {
> auto descriptor = std::move(descriptors_.front());
> descriptors_.pop();
>
> lock.unlock();
>
> ...
>
> callbacks_->process_capture_result(callbacks_,
> &captureResult);
>
> lock.lock();
> }
> ...
>
> callbacks_->process_capture_result(callbacks_,
> &captureResult);
>
> lock.lock();
> }
>
> Thread A gets the request at the top of the queue, thread B gets the
> next one, and signals its completion before thread A.
>
> It's likely possible to implement something clever here, to minimize the
> locked sections and still be correct, but that may be unnecessary
> optimization. We could start by holding the lock for the whole duration
> of the function. In that case, I would remove the lock from
> sendCaptureResults() completely, and call the function with the lock
> held in completeDescriptor().
Yes, provided sendCaptureResults() is only called completeDescriptor().
I don't want to get a implementation which uses completeDescriptor() and
sendCaptureResults() both for process_capture_result() callback.
>
>>> possible to do better. We could add entries to the map in
>>> processCaptureRequest() instead. We would need, here, to remove entries
>> I am not so sure about this, We could add entries in
>> processCaptureRequest() but it feels too early. Let me try to address
>> other comments, and I can iterate on this on top.
>>
>>> from the map if stream->process() fails, and complete the request if we
>>> remove the last entry (as otherwise, streamProcessingComplete() could
>>> complete all streams, then the last stream->process() call could fail,
>>> and nobody would complete the request).
>> This is still a bit tricky to handle. What's the de-facto criteria to
>> remove entries from the map (not just the error path) ?
>>
>> Assuming we hold the lock for entire duration of
>>
>> Locker.lock();
>>
>> for {
>> ...
>> int ret = stream->process(...);
>> if (ret) {
>> descriptor.pendingPostProcess_.erase(stream);
>> setBufferStatus(stream, error);
>>
>> if (descriptor.pendingPostProcess_.size() == 0)
>> completeDescriptor (error);
>>
>> }
>>
>> }
>> locker.unlock();
>>
>> ....
>>
>> streamProcessingComplete(..., bufferStatus)
>>
>> {
>>
>> Locker.lock();
>>
>> descriptor.pendingPostProcess_.erase(stream);
>> setBufferStatus(stream, bufferStatus);
>>
>> if (descriptor.pendingPostProcess_.size() == 0)
>> completeDescriptor (?);
>>
>> }
>>
>> the ? indicates with what status I should complete the descriptor? I
>> would probably need to iterate over all the descriptor's buffer status
>> to see if any of the (previous)buffer has failed to set error status on
>> the descriptor overall? I think so, we would still need a look up loop
>> here, I can't see deciphering this from the map.
> Can we record a post-processing status in the descriptor, setting it to
> success when constructor, and setting it to error when a post-processing
> fails ? Then you can inspect that state only when completing the
> request. A boolean flag (hasPostProcessingErrors for instance) could be
> enough.
I would take the boolean flag approach for now.
>
> Or we could reuse the status_ member variable for that. It's currently
> used both to tell if a descriptor is complete (when !pending) and what
> its status is. We could split that into a status flag that would only
> have success and error values, and a pending (or complete) boolean. This
> may be a better design, as it would allow us to update the request
> status as soon as an error is detected, and set the completion flag
> separately only when all processing has completed.
This is sane, which in interest of not increasing further scope of the
series, I am tempted to address this on top. I can record it with a \todo
>
>> Are we getting too complicated with this? I have started to feel so ...
>>
>>> Other designs may be possible.
>>>
>>>> }
>>>>
>>>> + if (needsPostProcessing) {
>>>> + if (processingStatus == Camera3RequestDescriptor::Status::Error) {
>>>> + descriptor->status_ = processingStatus;
>>>> + sendCaptureResults();
>>>> + }
>>>> +
>>>> + return;
>>>> + }
>>>> +
>>>> descriptor->status_ = Camera3RequestDescriptor::Status::Success;
>>>> sendCaptureResults();
>>>> }
>>>> @@ -1206,6 +1220,64 @@ void CameraDevice::sendCaptureResults()
>>>> }
>>>> }
>>>>
>>>> +void CameraDevice::setBufferStatus(CameraStream *cameraStream,
>>>> + Camera3RequestDescriptor::StreamBuffer &buffer,
>>>> + Camera3RequestDescriptor *request,
>>>> + Camera3RequestDescriptor::Status status)
>>>> +{
>>>> + /*
>>>> + * Return the FrameBuffer to the CameraStream now that we're
>>>> + * done processing it.
>>>> + */
>>>> + if (cameraStream->type() == CameraStream::Type::Internal)
>>>> + cameraStream->putBuffer(buffer.internalBuffer);
>>>> +
>>>> + buffer.status = status;
>>>> + if (status != Camera3RequestDescriptor::Status::Success)
>>>> + notifyError(request->frameNumber_, buffer.stream->camera3Stream(),
>>>> + CAMERA3_MSG_ERROR_BUFFER);
>>>> +}
>>>> +
>>>> +void CameraDevice::streamProcessingComplete(CameraStream *cameraStream,
>>>> + Camera3RequestDescriptor *request,
>>>> + Camera3RequestDescriptor::Status status)
>>>> +{
>>>> + MutexLocker locker(request->streamsProcessMutex_);
>>>> + for (auto &buffer : request->buffers_) {
>>>> + if (buffer.stream != cameraStream)
>>>> + continue;
>>>> +
>>>> + setBufferStatus(cameraStream, buffer, request, status);
>>>> + }
>>>> +
>>>> + bool hasPostProcessingErrors = false;
>>>> + for (auto &buffer : request->buffers_) {
>>>> + if (cameraStream->type() == CameraStream::Type::Direct)
>>>> + continue;
>>>> +
>>>> + /*
>>>> + * Other eligible buffers might be waiting to get post-processed.
>>>> + * So wait for their turn before sendCaptureResults() for the
>>>> + * descriptor.
>>>> + */
>>>> + if (buffer.status == Camera3RequestDescriptor::Status::Pending)
>>>> + return;
>>>> +
>>>> + if (!hasPostProcessingErrors &&
>>>> + buffer.status == Camera3RequestDescriptor::Status::Error)
>>>> + hasPostProcessingErrors = true;
>>>> + }
>>>> +
>>>> + if (hasPostProcessingErrors)
>>>> + request->status_ = Camera3RequestDescriptor::Status::Error;
>>>> + else
>>>> + request->status_ = Camera3RequestDescriptor::Status::Success;
>>>> +
>>>> + locker.unlock();
>>>> +
>>>> + sendCaptureResults();
>>>> +}
>>>> +
>>>> std::string CameraDevice::logPrefix() const
>>>> {
>>>> return "'" + camera_->id() + "'";
>>>> diff --git a/src/android/camera_device.h b/src/android/camera_device.h
>>>> index 863cf414..1ef933da 100644
>>>> --- a/src/android/camera_device.h
>>>> +++ b/src/android/camera_device.h
>>>> @@ -66,6 +66,9 @@ public:
>>>> int configureStreams(camera3_stream_configuration_t *stream_list);
>>>> int processCaptureRequest(camera3_capture_request_t *request);
>>>> void requestComplete(libcamera::Request *request);
>>>> + void streamProcessingComplete(CameraStream *cameraStream,
>>>> + Camera3RequestDescriptor *request,
>>>> + Camera3RequestDescriptor::Status status);
>>>>
>>>> protected:
>>>> std::string logPrefix() const override;
>>>> @@ -94,6 +97,10 @@ private:
>>>> camera3_error_msg_code code) const;
>>>> int processControls(Camera3RequestDescriptor *descriptor);
>>>> void sendCaptureResults();
>>>> + void setBufferStatus(CameraStream *cameraStream,
>>>> + Camera3RequestDescriptor::StreamBuffer &buffer,
>>>> + Camera3RequestDescriptor *request,
>>>> + Camera3RequestDescriptor::Status status);
>>>> std::unique_ptr<CameraMetadata> getResultMetadata(
>>>> const Camera3RequestDescriptor &descriptor) const;
>>>>
>>>> diff --git a/src/android/camera_request.h b/src/android/camera_request.h
>>>> index 05dabf89..3a2774e0 100644
>>>> --- a/src/android/camera_request.h
>>>> +++ b/src/android/camera_request.h
>>>> @@ -8,6 +8,7 @@
>>>> #define __ANDROID_CAMERA_REQUEST_H__
>>>>
>>>> #include <memory>
>>>> +#include <mutex>
>>>> #include <vector>
>>>>
>>>> #include <libcamera/base/class.h>
>>>> @@ -37,6 +38,7 @@ public:
>>>> std::unique_ptr<libcamera::FrameBuffer> frameBuffer;
>>>> int fence;
>>>> Status status;
>>>> + libcamera::FrameBuffer *internalBuffer = nullptr;
>>>> };
>>>>
>>>> Camera3RequestDescriptor(libcamera::Camera *camera,
>>>> @@ -47,6 +49,8 @@ public:
>>>>
>>>> uint32_t frameNumber_ = 0;
>>>>
>>>> + /* Protects buffers_ for post-processing streams. */
>>>> + std::mutex streamsProcessMutex_;
>>>> std::vector<StreamBuffer> buffers_;
>>>>
>>>> CameraMetadata settings_;
>>>> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
>>>> index f44a2717..04cbef8c 100644
>>>> --- a/src/android/camera_stream.cpp
>>>> +++ b/src/android/camera_stream.cpp
>>>> @@ -22,6 +22,7 @@
>>>> #include "camera_capabilities.h"
>>>> #include "camera_device.h"
>>>> #include "camera_metadata.h"
>>>> +#include "post_processor.h"
>>>>
>>>> using namespace libcamera;
>>>>
>>>> @@ -97,6 +98,18 @@ int CameraStream::configure()
>>>> int ret = postProcessor_->configure(configuration(), output);
>>>> if (ret)
>>>> return ret;
>>>> +
>>>> + postProcessor_->processComplete.connect(
>>>> + this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
>>>> + Camera3RequestDescriptor::Status bufferStatus =
>>>> + Camera3RequestDescriptor::Status::Error;
>>>> +
>>>> + if (status == PostProcessor::Status::Success)
>>>> + bufferStatus = Camera3RequestDescriptor::Status::Success;
>>> It looks weird to make Error a default (and thus special) case. Would
>>> any of the following be better ?
>>>
>>> Camera3RequestDescriptor::Status bufferStatus;
>>>
>>> if (status == PostProcessor::Status::Success)
>>> bufferStatus = Camera3RequestDescriptor::Status::Success;
>>> else
>>> bufferStatus = Camera3RequestDescriptor::Status::Error;
>>>
>>>
>>> Camera3RequestDescriptor::Status bufferStatus =
>>> status == PostProcessor::Status::Success ?
>>> Camera3RequestDescriptor::Status::Success :
>>> Camera3RequestDescriptor::Status::Error;
>>>
>>> I think I like the second option best.
>>>
>>>> +
>>>> + cameraDevice_->streamProcessingComplete(this, request,
>>>> + bufferStatus);
>>>> + });
>>>> }
>>>>
>>>> if (type_ == Type::Internal) {
>>>> diff --git a/src/android/jpeg/post_processor_jpeg.cpp b/src/android/jpeg/post_processor_jpeg.cpp
>>>> index 699576ef..a001fede 100644
>>>> --- a/src/android/jpeg/post_processor_jpeg.cpp
>>>> +++ b/src/android/jpeg/post_processor_jpeg.cpp
>>>> @@ -198,6 +198,7 @@ int PostProcessorJpeg::process(const FrameBuffer &source,
>>>> exif.data(), quality);
>>>> if (jpeg_size < 0) {
>>>> LOG(JPEG, Error) << "Failed to encode stream image";
>>>> + processComplete.emit(request, PostProcessor::Status::Error);
>>> I think you can write Status::Error here and below, including in
>>> post_processor_yuv.cpp (not above in camera_stream.cpp though).
>>>
>>>> return jpeg_size;
>>>> }
>>>>
>>>> @@ -211,6 +212,7 @@ int PostProcessorJpeg::process(const FrameBuffer &source,
>>>>
>>>> /* Update the JPEG result Metadata. */
>>>> resultMetadata->addEntry(ANDROID_JPEG_SIZE, jpeg_size);
>>>> + processComplete.emit(request, PostProcessor::Status::Success);
>>>>
>>>> return 0;
>>>> }
>>>> diff --git a/src/android/post_processor.h b/src/android/post_processor.h
>>>> index 27eaef88..14f5e8c7 100644
>>>> --- a/src/android/post_processor.h
>>>> +++ b/src/android/post_processor.h
>>>> @@ -7,6 +7,8 @@
>>>> #ifndef __ANDROID_POST_PROCESSOR_H__
>>>> #define __ANDROID_POST_PROCESSOR_H__
>>>>
>>>> +#include <libcamera/base/signal.h>
>>>> +
>>>> #include <libcamera/framebuffer.h>
>>>> #include <libcamera/stream.h>
>>>>
>>>> @@ -17,6 +19,11 @@ class Camera3RequestDescriptor;
>>>> class PostProcessor
>>>> {
>>>> public:
>>>> + enum class Status {
>>>> + Error,
>>>> + Success
>>>> + };
>>>> +
>>>> virtual ~PostProcessor() = default;
>>>>
>>>> virtual int configure(const libcamera::StreamConfiguration &inCfg,
>>>> @@ -24,6 +31,8 @@ public:
>>>> virtual int process(const libcamera::FrameBuffer &source,
>>>> CameraBuffer *destination,
>>>> Camera3RequestDescriptor *request) = 0;
>>>> +
>>>> + libcamera::Signal<Camera3RequestDescriptor *, Status> processComplete;
>>>> };
>>>>
>>>> #endif /* __ANDROID_POST_PROCESSOR_H__ */
>>>> diff --git a/src/android/yuv/post_processor_yuv.cpp b/src/android/yuv/post_processor_yuv.cpp
>>>> index 8110a1f1..fd364741 100644
>>>> --- a/src/android/yuv/post_processor_yuv.cpp
>>>> +++ b/src/android/yuv/post_processor_yuv.cpp
>>>> @@ -51,14 +51,17 @@ int PostProcessorYuv::configure(const StreamConfiguration &inCfg,
>>>>
>>>> int PostProcessorYuv::process(const FrameBuffer &source,
>>>> CameraBuffer *destination,
>>>> - [[maybe_unused]] Camera3RequestDescriptor *request)
>>>> + Camera3RequestDescriptor *request)
>>>> {
>>>> - if (!isValidBuffers(source, *destination))
>>>> + if (!isValidBuffers(source, *destination)) {
>>>> + processComplete.emit(request, PostProcessor::Status::Error);
>>>> return -EINVAL;
>>>> + }
>>>>
>>>> const MappedFrameBuffer sourceMapped(&source, MappedFrameBuffer::MapFlag::Read);
>>>> if (!sourceMapped.isValid()) {
>>>> LOG(YUV, Error) << "Failed to mmap camera frame buffer";
>>>> + processComplete.emit(request, PostProcessor::Status::Error);
>>>> return -EINVAL;
>>>> }
>>>>
>>>> @@ -76,9 +79,12 @@ int PostProcessorYuv::process(const FrameBuffer &source,
>>>> libyuv::FilterMode::kFilterBilinear);
>>>> if (ret) {
>>>> LOG(YUV, Error) << "Failed NV12 scaling: " << ret;
>>>> + processComplete.emit(request, PostProcessor::Status::Error);
>>>> return -EINVAL;
>>>> }
>>>>
>>>> + processComplete.emit(request, PostProcessor::Status::Success);
>>>> +
>>>> return 0;
>>>> }
>>>>
More information about the libcamera-devel
mailing list