[libcamera-devel] [PATCH v5 1/4] android: Notify post processing completion via a signal

Umang Jain umang.jain at ideasonboard.com
Thu Oct 21 16:34:18 CEST 2021


Hi Laurent,

On 10/21/21 7:22 PM, Laurent Pinchart wrote:
> Hi Umang,
>
> On Thu, Oct 21, 2021 at 11:05:44AM +0530, Umang Jain wrote:
>> On 10/21/21 6:01 AM, Laurent Pinchart wrote:
>>> On Wed, Oct 20, 2021 at 04:12:09PM +0530, Umang Jain wrote:
>>>> Notify that the post processing for a request has been completed,
>>>> via a signal. A pointer to the descriptor which is tracking the
>>>> capture request is emitted along with the status of post processed
>>>> buffer. The function CameraDevice::streamProcessingComplete() will
>>>> finally set the status on the request descriptor and send capture
>>>> results back to the framework accordingly.
>>>>
>>>> We also need to save a pointer to any internal buffers that might have
>>>> been allocated by CameraStream. The buffer should be returned back to
>>>> CameraStream just before capture results are sent.
>>>>
>>>> A streamProcessMutex_ has been introduced here itself, which will be
>>> Did you mean s/itself/as well/ ?
>>>
>>>> applicable to guard access to descriptor->buffers_ when post-processing
>>>> is moved to be asynchronous in subsequent commits.
>>>>
>>>> Signed-off-by: Umang Jain <umang.jain at ideasonboard.com>
>>>> ---
>>>>    src/android/camera_device.cpp            | 92 +++++++++++++++++++++---
>>>>    src/android/camera_device.h              |  7 ++
>>>>    src/android/camera_request.h             |  4 ++
>>>>    src/android/camera_stream.cpp            | 13 ++++
>>>>    src/android/jpeg/post_processor_jpeg.cpp |  2 +
>>>>    src/android/post_processor.h             |  9 +++
>>>>    src/android/yuv/post_processor_yuv.cpp   | 10 ++-
>>>>    7 files changed, 125 insertions(+), 12 deletions(-)
>>>>
>>>> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
>>>> index 806b4090..541c2c81 100644
>>>> --- a/src/android/camera_device.cpp
>>>> +++ b/src/android/camera_device.cpp
>>>> @@ -1117,6 +1117,15 @@ void CameraDevice::requestComplete(Request *request)
>>>>    	}
>>>>    
>>>>    	/* Handle post-processing. */
>>>> +	bool needsPostProcessing = false;
>>>> +	Camera3RequestDescriptor::Status processingStatus =
>>>> +		Camera3RequestDescriptor::Status::Pending;
>>>> +	/*
>>>> +	 * \todo Apply streamsProcessMutex_ when post-processing is adapted to run
>>>> +	 * asynchronously. If we introduce the streamsProcessMutex_ right away, the
>>>> +	 * lock will be held forever since it is synchronous at this point
>>>> +	 * (see streamProcessingComplete).
>>>> +	 */
>>>>    	for (auto &buffer : descriptor->buffers_) {
>>>>    		CameraStream *stream = buffer.stream;
>>>>    
>>>> @@ -1132,22 +1141,27 @@ void CameraDevice::requestComplete(Request *request)
>>>>    			continue;
>>>>    		}
>>>>    
>>>> -		int ret = stream->process(*src, buffer, descriptor);
>>>> -
>>>> -		/*
>>>> -		 * Return the FrameBuffer to the CameraStream now that we're
>>>> -		 * done processing it.
>>>> -		 */
>>>>    		if (stream->type() == CameraStream::Type::Internal)
>>>> -			stream->putBuffer(src);
>>>> +			buffer.internalBuffer = src;
>>> Let's make the field name more self-explanatory. A possible candidate is
>>> sourceBuffer instead of internalBuffer.
>>>
>>> Could we do this in CameraDevice::processCaptureRequest(), just after
>>> calling cameraStream->getBuffer() ? I'll feel less worried about a leak
>>> if we store the buffer for later release right after acquiring it.
>> Ack.
>>
>>>>    
>>>> +		needsPostProcessing = true;
>>>> +		int ret = stream->process(*src, buffer, descriptor);
>>>>    		if (ret) {
>>>> -			buffer.status = Camera3RequestDescriptor::Status::Error;
>>>> -			notifyError(descriptor->frameNumber_, stream->camera3Stream(),
>>>> -				    CAMERA3_MSG_ERROR_BUFFER);
>>>> +			setBufferStatus(stream, buffer, descriptor,
>>>> +					Camera3RequestDescriptor::Status::Error);
>>>> +			processingStatus = Camera3RequestDescriptor::Status::Error;
>>>>    		}
>>> I think we need to improve error handling a little bit, in order to
>>> support multiple streams being post-processed. We need to complete the
>>> request in CameraDevice::streamProcessingComplete() when the last
>>> processing stream completes, so the number of post-processed streams
>>> need to be somehow recorded in the Camera3RequestDescriptor. It could be
>>> tempted to have a counter, but a
>> I have implemented a counter based solution, it still didn't address all
>> the corner cases. Since we have two  path of error handling (synchronous
>> error and asynchronous error), a counter based implementation was
>> probably too verbose with much of code duplication to handle various
>> scenarios.
>>
>>> 	std::map<CameraStream *, StreamBuffer *> pendingPostProcess_;
>>>
>>> could be better, as it will also remove the need for loops in
>>> streamProcessingComplete() to lookup the buffer from the stream and to
>>> check if the request has been fully processed. Accessing the map has to
>>> be done with the mutex held to avoid race conditions, but it's a bit
>>> more tricky than that. If we just add elements to the map here, we could
>> Good, so you do understand so many looping lookups in
>> streamProcessingComplete.
>>
>> I broadly feel okay with std::map<CameraStream *, StreamBuffer *>
>> pendingPostProcess_; as long as it resides inside the descriptor. For
>> me, it avoids lookups, and can address the \todo in 3/4.
>>
>> Having said that, I feel a bit wary of introducing more data structures
>> for management. We already have a queue of post-processing (convert to
>> std:deque if we want to access the queued requests for post-processing)
>> and now we will have another map to tracking the requests. I wonder if
>> we can get away with a PostProcessorWorker's std::deque, with the
>> benefits of the map. But again, we have to take care of exposure / mutex
>> with main thread, which can lead to subtle bugs and unwanted waiting on
>> each thread
>>
>> Now I think about it, the pendingPostProcess_ map is about Stream's
>> pending processing _per descriptor_, whereas the PostProcessorWorker's
>> queue is replacement for Thread's message queue. Hmm.
> That's right, those two data structures are independent from each other.
>
>>> race with streamProcessingComplete() as it could be called for the first
>>> post-processed stream before we have a chance to add the second one
>>> here. streamProcessingComplete() would then incorrectly think it has
>>> completed the last stream, and proceed to complete the request.
>>>
>>> This race could be solved by holding the lock for the whole duration of
>>> the loop here, covering all calls to stream->process(), but it may be
>> Yes,  I am considering this in current implementation too. The way it
>> will look like, is first queue up all the post-processing requests to
>> PostProcessorWoe, only then, allow any of the
>> slot(streamProcessingComplete) to get executed. A simple mutex hold can
>> do that
> I usually try to minimize the amount of code covered by locks, and focus
> on data access only if possible instead of keeping the lock held over
> the callbacks. Let's see if that's possible. There's a tricky part
> though, as sendCaptureResults() is called from two different threads
> (synchronously with request queuing in error paths and asynchronously in
> the completion path), we currently have a race condition.
>
> Thread A							Thread B
> -----------------------------------------------------------------------------------------------------------
>
> MutexLocker lock(descriptorsMutex_);
> while (!descriptors_.empty() &&
>         !descriptors_.front()->isPending()) {
>          auto descriptor = std::move(descriptors_.front());
>          descriptors_.pop();
>
>          lock.unlock();
> 								MutexLocker lock(descriptorsMutex_);
> 								while (!descriptors_.empty() &&
> 								       !descriptors_.front()->isPending()) {
> 									auto descriptor = std::move(descriptors_.front());
> 									descriptors_.pop();
>
> 									lock.unlock();
>
> 									...
>
> 									callbacks_->process_capture_result(callbacks_,
> 													   &captureResult);
>
> 									lock.lock();
> 								}
> 	...
>
>          callbacks_->process_capture_result(callbacks_,
> 					   &captureResult);
>
>          lock.lock();
> }
>
> Thread A gets the request at the top of the queue, thread B gets the
> next one, and signals its completion before thread A.
>
> It's likely possible to implement something clever here, to minimize the
> locked sections and still be correct, but that may be unnecessary
> optimization. We could start by holding the lock for the whole duration
> of the function. In that case, I would remove the lock from
> sendCaptureResults() completely, and call the function with the lock
> held in completeDescriptor().


Yes, provided sendCaptureResults() is only called completeDescriptor(). 
I don't want to get a implementation which uses completeDescriptor() and 
sendCaptureResults() both for process_capture_result() callback.

>
>>> possible to do better. We could add entries to the map in
>>> processCaptureRequest() instead. We would need, here, to remove entries
>> I am not so sure about this, We could add entries in
>> processCaptureRequest() but it feels too early. Let me try to address
>> other comments, and I can iterate on this on top.
>>
>>> from the map if stream->process() fails, and complete the request if we
>>> remove the last entry (as otherwise, streamProcessingComplete() could
>>> complete all streams, then the last stream->process() call could fail,
>>> and nobody would complete the request).
>> This is still a bit tricky to handle. What's the de-facto criteria to
>> remove entries from the map (not just the error path) ?
>>
>> Assuming we hold the lock for entire duration of
>>
>>       Locker.lock();
>>
>>       for {
>>           ...
>>           int ret = stream->process(...);
>>           if (ret) {
>>               descriptor.pendingPostProcess_.erase(stream);
>>               setBufferStatus(stream, error);
>>
>>               if (descriptor.pendingPostProcess_.size() == 0)
>>                   completeDescriptor (error);
>>
>>           }
>>
>>       }
>>       locker.unlock();
>>
>> ....
>>
>> streamProcessingComplete(..., bufferStatus)
>>
>> {
>>
>>       Locker.lock();
>>
>> descriptor.pendingPostProcess_.erase(stream);
>>       setBufferStatus(stream, bufferStatus);
>>
>>       if (descriptor.pendingPostProcess_.size() == 0)
>>           completeDescriptor (?);
>>
>> }
>>
>> the ? indicates with what status I should complete the descriptor? I
>> would probably need to iterate over all the descriptor's buffer status
>> to see if any of the (previous)buffer has failed to set error status on
>> the descriptor overall? I think so, we would still need a look up loop
>> here, I can't see deciphering this from the map.
> Can we record a post-processing status in the descriptor, setting it to
> success when constructor, and setting it to error when a post-processing
> fails ? Then you can inspect that state only when completing the
> request. A boolean flag (hasPostProcessingErrors for instance) could be
> enough.


I would take the boolean flag approach for now.

>
> Or we could reuse the status_ member variable for that. It's currently
> used both to tell if a descriptor is complete (when !pending) and what
> its status is. We could split that into a status flag that would only
> have success and error values, and a pending (or complete) boolean. This
> may be a better design, as it would allow us to update the request
> status as soon as an error is detected, and set the completion flag
> separately only when all processing has completed.


This is sane, which in interest of not increasing further scope of the 
series, I am tempted to address this on top. I can record it with a \todo

>
>> Are we getting too complicated with this? I have started to feel so ...
>>
>>> Other designs may be possible.
>>>
>>>>    	}
>>>>    
>>>> +	if (needsPostProcessing) {
>>>> +		if (processingStatus == Camera3RequestDescriptor::Status::Error) {
>>>> +			descriptor->status_ = processingStatus;
>>>> +			sendCaptureResults();
>>>> +		}
>>>> +
>>>> +		return;
>>>> +	}
>>>> +
>>>>    	descriptor->status_ = Camera3RequestDescriptor::Status::Success;
>>>>    	sendCaptureResults();
>>>>    }
>>>> @@ -1206,6 +1220,64 @@ void CameraDevice::sendCaptureResults()
>>>>    	}
>>>>    }
>>>>    
>>>> +void CameraDevice::setBufferStatus(CameraStream *cameraStream,
>>>> +				   Camera3RequestDescriptor::StreamBuffer &buffer,
>>>> +				   Camera3RequestDescriptor *request,
>>>> +				   Camera3RequestDescriptor::Status status)
>>>> +{
>>>> +	/*
>>>> +	 * Return the FrameBuffer to the CameraStream now that we're
>>>> +	 * done processing it.
>>>> +	 */
>>>> +	if (cameraStream->type() == CameraStream::Type::Internal)
>>>> +		cameraStream->putBuffer(buffer.internalBuffer);
>>>> +
>>>> +	buffer.status = status;
>>>> +	if (status != Camera3RequestDescriptor::Status::Success)
>>>> +		notifyError(request->frameNumber_, buffer.stream->camera3Stream(),
>>>>    +			    CAMERA3_MSG_ERROR_BUFFER);
>>>> +}
>>>> +
>>>> +void CameraDevice::streamProcessingComplete(CameraStream *cameraStream,
>>>> +					    Camera3RequestDescriptor *request,
>>>> +					    Camera3RequestDescriptor::Status status)
>>>> +{
>>>> +	MutexLocker locker(request->streamsProcessMutex_);
>>>> +	for (auto &buffer : request->buffers_) {
>>>> +		if (buffer.stream != cameraStream)
>>>> +			continue;
>>>> +
>>>> +		setBufferStatus(cameraStream, buffer, request, status);
>>>> +	}
>>>> +
>>>> +	bool hasPostProcessingErrors = false;
>>>> +	for (auto &buffer : request->buffers_) {
>>>> +		if (cameraStream->type() == CameraStream::Type::Direct)
>>>> +			continue;
>>>> +
>>>> +		/*
>>>> +		 * Other eligible buffers might be waiting to get post-processed.
>>>> +		 * So wait for their turn before sendCaptureResults() for the
>>>> +		 * descriptor.
>>>> +		 */
>>>> +		if (buffer.status == Camera3RequestDescriptor::Status::Pending)
>>>> +			return;
>>>> +
>>>> +		if (!hasPostProcessingErrors &&
>>>> +		    buffer.status == Camera3RequestDescriptor::Status::Error)
>>>> +			hasPostProcessingErrors = true;
>>>> +	}
>>>> +
>>>> +	if (hasPostProcessingErrors)
>>>> +		request->status_ = Camera3RequestDescriptor::Status::Error;
>>>> +	else
>>>> +		request->status_ = Camera3RequestDescriptor::Status::Success;
>>>> +
>>>> +	locker.unlock();
>>>> +
>>>> +	sendCaptureResults();
>>>> +}
>>>> +
>>>>    std::string CameraDevice::logPrefix() const
>>>>    {
>>>>    	return "'" + camera_->id() + "'";
>>>> diff --git a/src/android/camera_device.h b/src/android/camera_device.h
>>>> index 863cf414..1ef933da 100644
>>>> --- a/src/android/camera_device.h
>>>> +++ b/src/android/camera_device.h
>>>> @@ -66,6 +66,9 @@ public:
>>>>    	int configureStreams(camera3_stream_configuration_t *stream_list);
>>>>    	int processCaptureRequest(camera3_capture_request_t *request);
>>>>    	void requestComplete(libcamera::Request *request);
>>>> +	void streamProcessingComplete(CameraStream *cameraStream,
>>>> +				      Camera3RequestDescriptor *request,
>>>> +				      Camera3RequestDescriptor::Status status);
>>>>    
>>>>    protected:
>>>>    	std::string logPrefix() const override;
>>>> @@ -94,6 +97,10 @@ private:
>>>>    			 camera3_error_msg_code code) const;
>>>>    	int processControls(Camera3RequestDescriptor *descriptor);
>>>>    	void sendCaptureResults();
>>>> +	void setBufferStatus(CameraStream *cameraStream,
>>>> +			     Camera3RequestDescriptor::StreamBuffer &buffer,
>>>> +			     Camera3RequestDescriptor *request,
>>>> +			     Camera3RequestDescriptor::Status status);
>>>>    	std::unique_ptr<CameraMetadata> getResultMetadata(
>>>>    		const Camera3RequestDescriptor &descriptor) const;
>>>>    
>>>> diff --git a/src/android/camera_request.h b/src/android/camera_request.h
>>>> index 05dabf89..3a2774e0 100644
>>>> --- a/src/android/camera_request.h
>>>> +++ b/src/android/camera_request.h
>>>> @@ -8,6 +8,7 @@
>>>>    #define __ANDROID_CAMERA_REQUEST_H__
>>>>    
>>>>    #include <memory>
>>>> +#include <mutex>
>>>>    #include <vector>
>>>>    
>>>>    #include <libcamera/base/class.h>
>>>> @@ -37,6 +38,7 @@ public:
>>>>    		std::unique_ptr<libcamera::FrameBuffer> frameBuffer;
>>>>    		int fence;
>>>>    		Status status;
>>>> +		libcamera::FrameBuffer *internalBuffer = nullptr;
>>>>    	};
>>>>    
>>>>    	Camera3RequestDescriptor(libcamera::Camera *camera,
>>>> @@ -47,6 +49,8 @@ public:
>>>>    
>>>>    	uint32_t frameNumber_ = 0;
>>>>    
>>>> +	/* Protects buffers_ for post-processing streams. */
>>>> +	std::mutex streamsProcessMutex_;
>>>>    	std::vector<StreamBuffer> buffers_;
>>>>    
>>>>    	CameraMetadata settings_;
>>>> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
>>>> index f44a2717..04cbef8c 100644
>>>> --- a/src/android/camera_stream.cpp
>>>> +++ b/src/android/camera_stream.cpp
>>>> @@ -22,6 +22,7 @@
>>>>    #include "camera_capabilities.h"
>>>>    #include "camera_device.h"
>>>>    #include "camera_metadata.h"
>>>> +#include "post_processor.h"
>>>>    
>>>>    using namespace libcamera;
>>>>    
>>>> @@ -97,6 +98,18 @@ int CameraStream::configure()
>>>>    		int ret = postProcessor_->configure(configuration(), output);
>>>>    		if (ret)
>>>>    			return ret;
>>>> +
>>>> +		postProcessor_->processComplete.connect(
>>>> +			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
>>>> +				Camera3RequestDescriptor::Status bufferStatus =
>>>> +					Camera3RequestDescriptor::Status::Error;
>>>> +
>>>> +				if (status == PostProcessor::Status::Success)
>>>> +					bufferStatus = Camera3RequestDescriptor::Status::Success;
>>> It looks weird to make Error a default (and thus special) case. Would
>>> any of the following be better ?
>>>
>>> 				Camera3RequestDescriptor::Status bufferStatus;
>>>
>>> 				if (status == PostProcessor::Status::Success)
>>> 					bufferStatus = Camera3RequestDescriptor::Status::Success;
>>> 				else
>>> 					bufferStatus = Camera3RequestDescriptor::Status::Error;
>>>
>>>
>>> 				Camera3RequestDescriptor::Status bufferStatus =
>>> 					status == PostProcessor::Status::Success ?
>>> 					Camera3RequestDescriptor::Status::Success :
>>> 					Camera3RequestDescriptor::Status::Error;
>>>
>>> I think I like the second option best.
>>>
>>>> +
>>>> +				cameraDevice_->streamProcessingComplete(this, request,
>>>> +									bufferStatus);
>>>> +			});
>>>>    	}
>>>>    
>>>>    	if (type_ == Type::Internal) {
>>>> diff --git a/src/android/jpeg/post_processor_jpeg.cpp b/src/android/jpeg/post_processor_jpeg.cpp
>>>> index 699576ef..a001fede 100644
>>>> --- a/src/android/jpeg/post_processor_jpeg.cpp
>>>> +++ b/src/android/jpeg/post_processor_jpeg.cpp
>>>> @@ -198,6 +198,7 @@ int PostProcessorJpeg::process(const FrameBuffer &source,
>>>>    					 exif.data(), quality);
>>>>    	if (jpeg_size < 0) {
>>>>    		LOG(JPEG, Error) << "Failed to encode stream image";
>>>> +		processComplete.emit(request, PostProcessor::Status::Error);
>>> I think you can write Status::Error here and below, including in
>>> post_processor_yuv.cpp (not above in camera_stream.cpp though).
>>>
>>>>    		return jpeg_size;
>>>>    	}
>>>>    
>>>> @@ -211,6 +212,7 @@ int PostProcessorJpeg::process(const FrameBuffer &source,
>>>>    
>>>>    	/* Update the JPEG result Metadata. */
>>>>    	resultMetadata->addEntry(ANDROID_JPEG_SIZE, jpeg_size);
>>>> +	processComplete.emit(request, PostProcessor::Status::Success);
>>>>    
>>>>    	return 0;
>>>>    }
>>>> diff --git a/src/android/post_processor.h b/src/android/post_processor.h
>>>> index 27eaef88..14f5e8c7 100644
>>>> --- a/src/android/post_processor.h
>>>> +++ b/src/android/post_processor.h
>>>> @@ -7,6 +7,8 @@
>>>>    #ifndef __ANDROID_POST_PROCESSOR_H__
>>>>    #define __ANDROID_POST_PROCESSOR_H__
>>>>    
>>>> +#include <libcamera/base/signal.h>
>>>> +
>>>>    #include <libcamera/framebuffer.h>
>>>>    #include <libcamera/stream.h>
>>>>    
>>>> @@ -17,6 +19,11 @@ class Camera3RequestDescriptor;
>>>>    class PostProcessor
>>>>    {
>>>>    public:
>>>> +	enum class Status {
>>>> +		Error,
>>>> +		Success
>>>> +	};
>>>> +
>>>>    	virtual ~PostProcessor() = default;
>>>>    
>>>>    	virtual int configure(const libcamera::StreamConfiguration &inCfg,
>>>> @@ -24,6 +31,8 @@ public:
>>>>    	virtual int process(const libcamera::FrameBuffer &source,
>>>>    			    CameraBuffer *destination,
>>>>    			    Camera3RequestDescriptor *request) = 0;
>>>> +
>>>> +	libcamera::Signal<Camera3RequestDescriptor *, Status> processComplete;
>>>>    };
>>>>    
>>>>    #endif /* __ANDROID_POST_PROCESSOR_H__ */
>>>> diff --git a/src/android/yuv/post_processor_yuv.cpp b/src/android/yuv/post_processor_yuv.cpp
>>>> index 8110a1f1..fd364741 100644
>>>> --- a/src/android/yuv/post_processor_yuv.cpp
>>>> +++ b/src/android/yuv/post_processor_yuv.cpp
>>>> @@ -51,14 +51,17 @@ int PostProcessorYuv::configure(const StreamConfiguration &inCfg,
>>>>    
>>>>    int PostProcessorYuv::process(const FrameBuffer &source,
>>>>    			      CameraBuffer *destination,
>>>> -			      [[maybe_unused]] Camera3RequestDescriptor *request)
>>>> +			      Camera3RequestDescriptor *request)
>>>>    {
>>>> -	if (!isValidBuffers(source, *destination))
>>>> +	if (!isValidBuffers(source, *destination)) {
>>>> +		processComplete.emit(request, PostProcessor::Status::Error);
>>>>    		return -EINVAL;
>>>> +	}
>>>>    
>>>>    	const MappedFrameBuffer sourceMapped(&source, MappedFrameBuffer::MapFlag::Read);
>>>>    	if (!sourceMapped.isValid()) {
>>>>    		LOG(YUV, Error) << "Failed to mmap camera frame buffer";
>>>> +		processComplete.emit(request, PostProcessor::Status::Error);
>>>>    		return -EINVAL;
>>>>    	}
>>>>    
>>>> @@ -76,9 +79,12 @@ int PostProcessorYuv::process(const FrameBuffer &source,
>>>>    				    libyuv::FilterMode::kFilterBilinear);
>>>>    	if (ret) {
>>>>    		LOG(YUV, Error) << "Failed NV12 scaling: " << ret;
>>>> +		processComplete.emit(request, PostProcessor::Status::Error);
>>>>    		return -EINVAL;
>>>>    	}
>>>>    
>>>> +	processComplete.emit(request, PostProcessor::Status::Success);
>>>> +
>>>>    	return 0;
>>>>    }
>>>>    


More information about the libcamera-devel mailing list