[libcamera-devel] [PATCH v7 7/7] android: post_processor: Make post processing async

Umang Jain umang.jain at ideasonboard.com
Tue Oct 26 06:39:22 CEST 2021


Hi Laurent,

On 10/26/21 3:35 AM, Laurent Pinchart wrote:
> Hi Umang,
>
> Thank you for the patch.
>
> On Tue, Oct 26, 2021 at 02:08:33AM +0530, Umang Jain wrote:
>> Introduce a dedicated worker class derived from libcamera::Thread.
>> The worker class maintains a queue for post-processing requests
>> and waits for a post-processing request to become available.
>> It will process them as per FIFO before de-queuing it from the
>> queue.
>>
>> The entire post-processing handling iteration is locked under
>> streamProcessMutex_ which helps us to queue all the post-processing
> s/streamProcessMutex_/streamsProcessMutex_/
>
>> request at once, before any of the post-processing completion slot
>> (streamProcessingComplete()) is allowed to run for post-processing
>> requests completing in parallel. This helps us to manage both
>> synchronous and asynchronous errors encountered during the entire
>> post processing operation. Since a post-processing operation can
>> even complete after CameraDevice::requestComplete() has returned,
>> we need to check and complete the descriptor from
>> streamProcessingComplete() running in the PostProcessorWorker's
>> thread.
>>
>> This patch also implements a flush() for the PostProcessorWorker
>> class which is responsible to purge post-processing requests
>> queued up while a camera is stopping/flushing. It is hooked with
>> CameraStream::flush(), which isn't used currently but will be
>> used when we handle flush/stop scenarios in greater detail
>> subsequently (in a different patchset).
>>
>> The libcamera request completion handler CameraDevice::requestComplete()
>> assumes that the request that has just completed is at the front of the
>> queue. Now that the post-processor runs asynchronously, this isn't true
>> anymore, a request being post-processed will stay in the queue and a new
>> libcamera request may complete. Remove that assumption, and use the
>> request cookie to obtain the Camera3RequestDescriptor.
>>
>> Signed-off-by: Umang Jain <umang.jain at ideasonboard.com>
>> Signed-off-by: Laurent Pinchart <laurent.pinchart at ideasonboard.com>
>> ---
>>   src/android/camera_device.cpp |  44 ++++++---------
>>   src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
>>   src/android/camera_stream.h   |  37 +++++++++++++
>>   3 files changed, 151 insertions(+), 31 deletions(-)
>>
>> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
>> index 3ded0f7e..53ebe0ea 100644
>> --- a/src/android/camera_device.cpp
>> +++ b/src/android/camera_device.cpp
>> @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>>   
>>   void CameraDevice::requestComplete(Request *request)
>>   {
>> -	Camera3RequestDescriptor *descriptor;
>> -	{
>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>> -		ASSERT(!descriptors_.empty());
>> -		descriptor = descriptors_.front().get();
>> -	}
>> -
>> -	if (descriptor->request_->cookie() != request->cookie()) {
>> -		/*
>> -		 * \todo Clarify if the Camera has to be closed on
>> -		 * ERROR_DEVICE.
>> -		 */
>> -		LOG(HAL, Error)
>> -			<< "Out-of-order completion for request "
>> -			<< utils::hex(request->cookie());
>> -
>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>> -		descriptors_.pop();
>> -
>> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
>> -
>> -		return;
>> -	}
>> +	Camera3RequestDescriptor *descriptor =
>> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>>   
>>   	/*
>>   	 * Prepare the capture result for the Android camera stack.
>> @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request)
>>   	}
>>   
>>   	/* Handle post-processing. */
>> +	MutexLocker locker(descriptor->streamsProcessMutex_);
>> +
>>   	/*
>> -	 * \todo Protect the loop below with streamProcessMutex_ when post
> In the patch that introduced this,
>
> s/streamProcessMutex_/streamsProcessMutex_/
>
>> -	 * processor runs asynchronously.
>> +	 * Queue all the post-processing streams request at once. The completion
>> +	 * slot streamProcessingComplete() can only execute when we are out
>> +	 * this critical section. This helps to handle synchronous errors here
>> +	 * itself.
>>   	 */
>>   	auto iter = descriptor->pendingStreamsToProcess_.begin();
>>   	while (iter != descriptor->pendingStreamsToProcess_.end()) {
>> @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request)
>>   		}
>>   	}
>>   
>> -	if (descriptor->pendingStreamsToProcess_.empty())
>> +	if (descriptor->pendingStreamsToProcess_.empty()) {
>> +		locker.unlock();
>>   		completeDescriptor(descriptor);
>> +	}
>>   }
>>   
>>   void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor)
>> @@ -1245,6 +1230,13 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff
>>   	MutexLocker locker(request->streamsProcessMutex_);
>>   
>>   	request->pendingStreamsToProcess_.erase(streamBuffer->stream);
>> +
>> +	if (!request->pendingStreamsToProcess_.empty())
>> +		return;
>> +
>> +	locker.unlock();
> Maybe
>
> 	{
> 		MutexLocker locker(request->streamsProcessMutex_);
>
> 		request->pendingStreamsToProcess_.erase(streamBuffer->stream);
> 		if (!request->pendingStreamsToProcess_.empty())
> 			return;
> 	}
>
> up to you.


Ok, yes, this looks cleaner

>
>> +
>> +	completeDescriptor(streamBuffer->request);
>>   }
>>   
>>   std::string CameraDevice::logPrefix() const
>> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
>> index fed99022..9023c13c 100644
>> --- a/src/android/camera_stream.cpp
>> +++ b/src/android/camera_stream.cpp
>> @@ -99,6 +99,7 @@ int CameraStream::configure()
>>   		if (ret)
>>   			return ret;
>>   
>> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>>   		postProcessor_->processComplete.connect(
>>   			this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
>>   				  PostProcessor::Status status) {
>> @@ -112,6 +113,8 @@ int CameraStream::configure()
>>   				cameraDevice_->streamProcessingComplete(streamBuffer,
>>   									bufferStatus);
>>   			});
>> +
>> +		worker_->start();
>>   	}
>>   
>>   	if (type_ == Type::Internal) {
>> @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
>>   		streamBuffer->fence = -1;
>>   	}
>>   
>> -	/*
>> -	 * \todo Buffer mapping and processing should be moved to a
>> -	 * separate thread.
>> -	 */
>>   	const StreamConfiguration &output = configuration();
>>   	streamBuffer->dstBuffer = std::make_unique<CameraBuffer>(
>>   		*streamBuffer->camera3Buffer, output.pixelFormat, output.size,
>> @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
>>   		return -EINVAL;
>>   	}
>>   
>> -	postProcessor_->process(streamBuffer);
>> +	worker_->queueRequest(streamBuffer);
>>   
>>   	return 0;
>>   }
>>   
>> +void CameraStream::flush()
>> +{
>> +	if (!postProcessor_)
>> +		return;
>> +
>> +	worker_->flush();
>> +}
>> +
>>   FrameBuffer *CameraStream::getBuffer()
>>   {
>>   	if (!allocator_)
>> @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>>   
>>   	buffers_.push_back(buffer);
>>   }
>> +
>> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
>> +	: postProcessor_(postProcessor)
>> +{
>> +}
>> +
>> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
>> +{
>> +	{
>> +		libcamera::MutexLocker lock(mutex_);
>> +		state_ = State::Stopped;
>> +	}
>> +
>> +	cv_.notify_one();
>> +	wait();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::start()
>> +{
>> +	{
>> +		libcamera::MutexLocker lock(mutex_);
>> +		ASSERT(state_ != State::Running);
>> +		state_ = State::Running;
>> +	}
>> +
>> +	Thread::start();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
>> +{
>> +	{
>> +		MutexLocker lock(mutex_);
>> +		ASSERT(state_ == State::Running);
>> +		requests_.push(dest);
>> +	}
>> +
>> +	cv_.notify_one();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::run()
>> +{
>> +	MutexLocker locker(mutex_);
>> +
>> +	while (1) {
>> +		cv_.wait(locker, [&] {
>> +			return state_ != State::Running || !requests_.empty();
>> +		});
>> +
>> +		if (state_ != State::Running)
>> +			break;
>> +
>> +		Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
>> +		requests_.pop();
>> +		locker.unlock();
>> +
>> +		postProcessor_->process(streamBuffer);
>> +
>> +		locker.lock();
>> +	}
>> +
>> +	if (state_ == State::Flushing) {
>> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
>> +			std::move(requests_);
>> +		locker.unlock();
>> +
>> +		while (!requests.empty()) {
>> +			postProcessor_->processComplete.emit(
>> +				requests.front(), PostProcessor::Status::Error);
>> +			requests.pop();
>> +		}
>> +
>> +		locker.lock();
>> +		state_ = State::Stopped;
>> +	}
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::flush()
>> +{
>> +	libcamera::MutexLocker lock(mutex_);
>> +	state_ = State::Flushing;
>> +	lock.unlock();
>> +
>> +	cv_.notify_one();
>> +}
>> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
>> index e74a9a3b..1588938a 100644
>> --- a/src/android/camera_stream.h
>> +++ b/src/android/camera_stream.h
>> @@ -7,12 +7,16 @@
>>   #ifndef __ANDROID_CAMERA_STREAM_H__
>>   #define __ANDROID_CAMERA_STREAM_H__
>>   
>> +#include <condition_variable>
>>   #include <memory>
>>   #include <mutex>
>> +#include <queue>
>>   #include <vector>
>>   
>>   #include <hardware/camera3.h>
>>   
>> +#include <libcamera/base/thread.h>
>> +
>>   #include <libcamera/camera.h>
>>   #include <libcamera/framebuffer.h>
>>   #include <libcamera/framebuffer_allocator.h>
>> @@ -20,6 +24,7 @@
>>   #include <libcamera/pixel_format.h>
>>   
>>   #include "camera_request.h"
>> +#include "post_processor.h"
>>   
>>   class CameraDevice;
>>   class PostProcessor;
> You can drop the forward declaration.


Ouch, What happens when you manually cherry-pick patches, because 
conflicts were too hard to deal with :-/

I will address these changes locally, should  I re-post a new version v8 
with those changes?

>
>> @@ -124,8 +129,38 @@ public:
>>   	int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer);
>>   	libcamera::FrameBuffer *getBuffer();
>>   	void putBuffer(libcamera::FrameBuffer *buffer);
>> +	void flush();
>>   
>>   private:
>> +	class PostProcessorWorker : public libcamera::Thread
>> +	{
>> +	public:
>> +		enum class State {
>> +			Stopped,
>> +			Running,
>> +			Flushing,
>> +		};
>> +
>> +		PostProcessorWorker(PostProcessor *postProcessor);
>> +		~PostProcessorWorker();
>> +
>> +		void start();
>> +		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
>> +		void flush();
>> +
>> +	protected:
>> +		void run() override;
>> +
>> +	private:
>> +		PostProcessor *postProcessor_;
>> +
>> +		libcamera::Mutex mutex_;
>> +		std::condition_variable cv_;
>> +
>> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
>> +		State state_ = State::Stopped;
>> +	};
>> +
>>   	int waitFence(int fence);
>>   
>>   	CameraDevice *const cameraDevice_;
>> @@ -142,6 +177,8 @@ private:
>>   	 */
>>   	std::unique_ptr<std::mutex> mutex_;
>>   	std::unique_ptr<PostProcessor> postProcessor_;
>> +
>> +	std::unique_ptr<PostProcessorWorker> worker_;
>>   };
>>   
>>   #endif /* __ANDROID_CAMERA_STREAM__ */


More information about the libcamera-devel mailing list