[libcamera-devel] [PATCH v5 3/4] android: post_processor: Make post processing async

Umang Jain umang.jain at ideasonboard.com
Thu Oct 21 07:53:04 CEST 2021


Hi Laurent,

On 10/21/21 7:03 AM, Laurent Pinchart wrote:
> Hi Umang,
>
> Thank you for the patch.
>
> On Wed, Oct 20, 2021 at 04:12:11PM +0530, Umang Jain wrote:
>> Introduce a dedicated worker class derived from libcamera::Thread.
>> The worker class maintains a queue for post-processing requests
>> and waits for a post-processing request to become available.
>> It will process them as per FIFO before de-queuing it from the
>> queue.
>>
>> To get access to the source and destination buffers in the worker
>> thread, we also need to save a pointer to them in the
> s/a pointer/pointers/
>
>> Camera3RequestDescriptor.
>>
>> This patch also implements a flush() for the PostProcessorWorker
>> class which is responsible to purge post-processing requests
>> queued up while a camera is stopping/flushing. It is hooked with
>> CameraStream::flush(), which isn't used currently but will be
>> used when we handle flush/stop scnearios in greater detail
>> subsequently (in a different patchset).
>>
>> The libcamera request completion handler CameraDevice::requestComplete()
>> assumes that the request that has just completed is at the front of the
>> queue. Now that the post-processor runs asynchronously, this isn't true
>> anymore, a request being post-processed will stay in the queue and a new
>> libcamera request may complete. Remove that assumption, and use the
>> request cookie to obtain the Camera3RequestDescriptor.
>>
>> Signed-off-by: Umang Jain <umang.jain at ideasonboard.com>
>> Signed-off-by: Laurent Pinchart <laurent.pinchart at ideasonboard.com>
>> ---
>>   src/android/camera_device.cpp |  46 +++++----------
>>   src/android/camera_request.h  |   4 ++
>>   src/android/camera_stream.cpp | 107 +++++++++++++++++++++++++++++++---
>>   src/android/camera_stream.h   |  38 +++++++++++-
>>   4 files changed, 156 insertions(+), 39 deletions(-)
>>
>> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
>> index 541c2c81..b14416ce 100644
>> --- a/src/android/camera_device.cpp
>> +++ b/src/android/camera_device.cpp
>> @@ -1020,29 +1020,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>>   
>>   void CameraDevice::requestComplete(Request *request)
>>   {
>> -	Camera3RequestDescriptor *descriptor;
>> -	{
>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>> -		ASSERT(!descriptors_.empty());
>> -		descriptor = descriptors_.front().get();
>> -	}
>> -
>> -	if (descriptor->request_->cookie() != request->cookie()) {
>> -		/*
>> -		 * \todo Clarify if the Camera has to be closed on
>> -		 * ERROR_DEVICE.
>> -		 */
>> -		LOG(HAL, Error)
>> -			<< "Out-of-order completion for request "
>> -			<< utils::hex(request->cookie());
>> -
>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>> -		descriptors_.pop();
>> -
>> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
>> -
>> -		return;
>> -	}
>> +	Camera3RequestDescriptor *descriptor =
>> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>>   
>>   	/*
>>   	 * Prepare the capture result for the Android camera stack.
>> @@ -1120,12 +1099,7 @@ void CameraDevice::requestComplete(Request *request)
>>   	bool needsPostProcessing = false;
>>   	Camera3RequestDescriptor::Status processingStatus =
>>   		Camera3RequestDescriptor::Status::Pending;
>> -	/*
>> -	 * \todo Apply streamsProcessMutex_ when post-processing is adapted to run
>> -	 * asynchronously. If we introduce the streamsProcessMutex_ right away, the
>> -	 * lock will be held forever since it is synchronous at this point
>> -	 * (see streamProcessingComplete).
>> -	 */
>> +
>>   	for (auto &buffer : descriptor->buffers_) {
>>   		CameraStream *stream = buffer.stream;
>>   
>> @@ -1145,7 +1119,13 @@ void CameraDevice::requestComplete(Request *request)
>>   			buffer.internalBuffer = src;
>>   
>>   		needsPostProcessing = true;
>> -		int ret = stream->process(*src, buffer, descriptor);
>> +
>> +		int ret;
>> +		{
>> +			MutexLocker locker(descriptor->streamsProcessMutex_);
> It may be possible (see my review of 2/4) to only lock access to the
> data structures and not cover the process() call. Let's see how it turns
> out.


Yes, I need to draft the code and then only I'll be able to confirm. My 
deep-code visual skills  are kinda limited.

>
>> +			ret = stream->process(*src, buffer, descriptor);
>> +		}
>> +
>>   		if (ret) {
>>   			setBufferStatus(stream, buffer, descriptor,
>>   					Camera3RequestDescriptor::Status::Error);
>> @@ -1156,6 +1136,12 @@ void CameraDevice::requestComplete(Request *request)
>>   	if (needsPostProcessing) {
>>   		if (processingStatus == Camera3RequestDescriptor::Status::Error) {
>>   			descriptor->status_ = processingStatus;
>> +			/*
>> +			 * \todo This is problematic when some streams are
>> +			 * queued successfully, but some fail to get queued.
>> +			 * We might end up with use-after-free situation for
>> +			 * descriptor in streamProcessingComplete().
>> +			 */
> Hopefully this will be fixed in v6 of 2/4 :-)
>
>>   			sendCaptureResults();
>>   		}
>>   
>> diff --git a/src/android/camera_request.h b/src/android/camera_request.h
>> index 3a2774e0..85274414 100644
>> --- a/src/android/camera_request.h
>> +++ b/src/android/camera_request.h
>> @@ -18,6 +18,7 @@
>>   
>>   #include <hardware/camera3.h>
>>   
>> +#include "camera_buffer.h"
>>   #include "camera_metadata.h"
>>   #include "camera_worker.h"
>>   
>> @@ -39,6 +40,9 @@ public:
>>   		int fence;
>>   		Status status;
>>   		libcamera::FrameBuffer *internalBuffer = nullptr;
>> +		std::unique_ptr<CameraBuffer> destBuffer;
>> +		const libcamera::FrameBuffer *srcBuffer;
>> +		Camera3RequestDescriptor *request = nullptr;
>>   	};
>>   
>>   	Camera3RequestDescriptor(libcamera::Camera *camera,
>> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
>> index b3cb06a2..a29ce33b 100644
>> --- a/src/android/camera_stream.cpp
>> +++ b/src/android/camera_stream.cpp
>> @@ -99,6 +99,7 @@ int CameraStream::configure()
>>   		if (ret)
>>   			return ret;
>>   
>> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>>   		postProcessor_->processComplete.connect(
>>   			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
>>   				Camera3RequestDescriptor::Status bufferStatus =
>> @@ -110,6 +111,7 @@ int CameraStream::configure()
>>   				cameraDevice_->streamProcessingComplete(this, request,
>>   									bufferStatus);
>>   			});
>> +		worker_->start();
>>   	}
>>   
>>   	if (type_ == Type::Internal) {
>> @@ -179,23 +181,31 @@ int CameraStream::process(const FrameBuffer &source,
>>   	if (!postProcessor_)
>>   		return 0;
>>   
>> -	/*
>> -	 * \todo Buffer mapping and processing should be moved to a
>> -	 * separate thread.
>> -	 */
>>   	const StreamConfiguration &output = configuration();
>> -	CameraBuffer destBuffer(*dest.camera3Buffer, output.pixelFormat,
>> -				output.size, PROT_READ | PROT_WRITE);
>> -	if (!destBuffer.isValid()) {
>> +	dest.destBuffer = std::make_unique<CameraBuffer>(
>> +		*dest.camera3Buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
> 	dest.destBuffer = std::make_unique<CameraBuffer>(
> 		*dest.camera3Buffer, output.pixelFormat, output.size,
> 		PROT_READ | PROT_WRITE);
>
>> +	if (!dest.destBuffer->isValid()) {
>>   		LOG(HAL, Error) << "Failed to create destination buffer";
>>   		return -EINVAL;
>>   	}
>>   
>> -	postProcessor_->process(source, &destBuffer, request);
>> +	dest.srcBuffer = &source;
>> +	dest.request = request;
>> +
>> +	/* Push the postProcessor request to the worker queue. */
>> +	worker_->queueRequest(&dest);
>>   
>>   	return 0;
>>   }
>>   
>> +void CameraStream::flush()
>> +{
>> +	if (!postProcessor_)
>> +		return;
>> +
>> +	worker_->flush();
>> +}
>> +
>>   FrameBuffer *CameraStream::getBuffer()
>>   {
>>   	if (!allocator_)
>> @@ -223,3 +233,84 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>>   
>>   	buffers_.push_back(buffer);
>>   }
>> +
>> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
>> +	: postProcessor_(postProcessor)
>> +{
>> +}
>> +
>> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
>> +{
>> +	{
>> +		libcamera::MutexLocker lock(mutex_);
>> +		state_ = State::Stopped;
>> +	}
>> +
>> +	cv_.notify_one();
>> +	wait();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::start()
>> +{
>> +	{
>> +		libcamera::MutexLocker lock(mutex_);
> Could you add a
>
> 		ASSERT(state_ != State::Running);
>
> here ? I'm worried that a future refactoring will call
> CameraStream::configure() multiple times, and that would result in the
> worker being start multiple times too.


Ack

>
>> +		state_ = State::Running;
>> +	}
>> +
>> +	Thread::start();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
>> +{
>> +	{
>> +		MutexLocker lock(mutex_);
>> +		ASSERT(state_ == State::Running);
>> +		requests_.push(dest);
>> +	}
>> +
>> +	cv_.notify_one();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::run()
>> +{
>> +	MutexLocker locker(mutex_);
>> +
>> +	while (1) {
>> +		cv_.wait(locker, [&] {
>> +			return state_ != State::Running || !requests_.empty();
>> +		});
>> +
>> +		if (state_ != State::Running)
>> +			break;
>> +
>> +		Camera3RequestDescriptor::StreamBuffer *stream = requests_.front();
>> +		requests_.pop();
>> +		locker.unlock();
>> +
>> +		postProcessor_->process(*stream->srcBuffer, stream->destBuffer.get(),
>> +					stream->request);
>> +
>> +		locker.lock();
>> +	}
>> +
>> +	if (state_ == State::Flushing) {
>> +		while (!requests_.empty()) {
>> +			postProcessor_->processComplete.emit(
>> +				requests_.front()->request,
>> +				PostProcessor::Status::Error);
> I'm also a tiny bit concerned here that we're emitting the signal with
> the lock held, while for requests that complete normally we don't. The
> behaviour isn't consistent and could cause issues in the CameraStream or
> CameraDevice classes. I think it's harmless for now, but maybe we could
> drop the lock around the emit(à call ? One option to avoid dropping and
> taking the lock in every iteration would be
>
> 		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests = std::move(requests_);
>
> 		locker.unlock();
>
> 		while (!requests.empty()) {
> 			postProcessor_->processComplete.emit(
> 				requests.front()->request,
> 				PostProcessor::Status::Error);
> 			requests.pop();
> 		}
>
> 		locker.lock();
>
> And now that I think about it, given that we now have a StreamBuffer
> object, wouldn't it be simpler to pass the StreamBuffer instead of the
> Camera3RequestDescriptor to the signal ?


Yesteday, Jacopo might have suggested the same. I'll try to accomodate this.

>> +		}
>> +
>> +		state_ = State::Stopped;
>> +		locker.unlock();
> You can drop the unlock() call here.


Ack. Shouldn't be here

>
>> +	}
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::flush()
>> +{
>> +	libcamera::MutexLocker lock(mutex_);
>> +	state_ = State::Flushing;
>> +	lock.unlock();
>> +
>> +	cv_.notify_one();
>> +}
>> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
>> index f242336e..64e32c77 100644
>> --- a/src/android/camera_stream.h
>> +++ b/src/android/camera_stream.h
>> @@ -7,12 +7,16 @@
>>   #ifndef __ANDROID_CAMERA_STREAM_H__
>>   #define __ANDROID_CAMERA_STREAM_H__
>>   
>> +#include <condition_variable>
>>   #include <memory>
>>   #include <mutex>
>> +#include <queue>
>>   #include <vector>
>>   
>>   #include <hardware/camera3.h>
>>   
>> +#include <libcamera/base/thread.h>
>> +
>>   #include <libcamera/camera.h>
>>   #include <libcamera/framebuffer.h>
>>   #include <libcamera/framebuffer_allocator.h>
>> @@ -20,9 +24,9 @@
>>   #include <libcamera/pixel_format.h>
>>   
>>   #include "camera_request.h"
>> +#include "post_processor.h"
>>   
>>   class CameraDevice;
>> -class PostProcessor;
>>   
>>   class CameraStream
>>   {
>> @@ -126,8 +130,38 @@ public:
>>   		    Camera3RequestDescriptor *request);
>>   	libcamera::FrameBuffer *getBuffer();
>>   	void putBuffer(libcamera::FrameBuffer *buffer);
>> +	void flush();
>>   
>>   private:
>> +	class PostProcessorWorker : public libcamera::Thread
>> +	{
>> +	public:
>> +		enum class State {
>> +			Stopped,
>> +			Running,
>> +			Flushing,
>> +		};
>> +
>> +		PostProcessorWorker(PostProcessor *postProcessor);
>> +		~PostProcessorWorker();
>> +
>> +		void start();
>> +		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
>> +		void flush();
>> +
>> +	protected:
>> +		void run() override;
>> +
>> +	private:
>> +		PostProcessor *postProcessor_;
>> +
>> +		libcamera::Mutex mutex_;
>> +		std::condition_variable cv_;
>> +
>> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
>> +		State state_;
>> +	};
>> +
>>   	int waitFence(int fence);
>>   
>>   	CameraDevice *const cameraDevice_;
>> @@ -144,6 +178,8 @@ private:
>>   	 */
>>   	std::unique_ptr<std::mutex> mutex_;
>>   	std::unique_ptr<PostProcessor> postProcessor_;
>> +
>> +	std::unique_ptr<PostProcessorWorker> worker_;
>>   };
>>   
>>   #endif /* __ANDROID_CAMERA_STREAM__ */


More information about the libcamera-devel mailing list