[libcamera-devel] [PATCH v2 1/3] libcamera: thread: Support selective message dispatch to thread

Laurent Pinchart laurent.pinchart at ideasonboard.com
Fri Jul 31 14:30:52 CEST 2020


Hi Umang,

Thank you for the patch.

On Fri, Jul 31, 2020 at 10:53:41AM +0000, Umang Jain wrote:
> Extend the current dispatchMessages() to support dispatching of
> selective messsages according to the Message::Type passed in
> the function argument. dispatchMessages() can now be called
> explicitly to force deliver selected type's message to the
> thread for processing (typically when event loop is not
> running).
> 
> Add a helper Message::Type::CatchAll message type to deliver every
> message posted to the thread.
> 
> Signed-off-by: Umang Jain <email at uajain.com>
> ---
>  include/libcamera/internal/message.h    |  1 +
>  include/libcamera/internal/thread.h     |  3 +-
>  src/libcamera/event_dispatcher_poll.cpp |  2 +-
>  src/libcamera/message.cpp               |  2 ++
>  src/libcamera/thread.cpp                | 47 ++++++++++++++++++-------
>  5 files changed, 40 insertions(+), 15 deletions(-)
> 
> diff --git a/include/libcamera/internal/message.h b/include/libcamera/internal/message.h
> index 92ea64a..b8d9866 100644
> --- a/include/libcamera/internal/message.h
> +++ b/include/libcamera/internal/message.h
> @@ -25,6 +25,7 @@ public:
>  		None = 0,
>  		InvokeMessage = 1,
>  		ThreadMoveMessage = 2,
> +		CatchAll = 999,

I don't think we need this, using 0 should be good enough for the
purpose at hand.

>  		UserMessage = 1000,
>  	};
>  
> diff --git a/include/libcamera/internal/thread.h b/include/libcamera/internal/thread.h
> index 7b59e58..1dfeb72 100644
> --- a/include/libcamera/internal/thread.h
> +++ b/include/libcamera/internal/thread.h
> @@ -14,6 +14,7 @@
>  
>  #include <libcamera/signal.h>
>  
> +#include "libcamera/internal/message.h"
>  #include "libcamera/internal/utils.h"
>  
>  namespace libcamera {
> @@ -47,7 +48,7 @@ public:
>  	EventDispatcher *eventDispatcher();
>  	void setEventDispatcher(std::unique_ptr<EventDispatcher> dispatcher);
>  
> -	void dispatchMessages();
> +	void dispatchMessages(Message::Type type);

I'd add a default value here

	void dispatchMessages(Message::Type type = Message::Type::None);

>  
>  protected:
>  	int exec();
> diff --git a/src/libcamera/event_dispatcher_poll.cpp b/src/libcamera/event_dispatcher_poll.cpp
> index 9ab85da..b9fabf8 100644
> --- a/src/libcamera/event_dispatcher_poll.cpp
> +++ b/src/libcamera/event_dispatcher_poll.cpp
> @@ -146,7 +146,7 @@ void EventDispatcherPoll::processEvents()
>  {
>  	int ret;
>  
> -	Thread::current()->dispatchMessages();
> +	Thread::current()->dispatchMessages(Message::Type::CatchAll);

So this can be dropped.

>  
>  	/* Create the pollfd array. */
>  	std::vector<struct pollfd> pollfds;
> diff --git a/src/libcamera/message.cpp b/src/libcamera/message.cpp
> index e9b3e73..e462f90 100644
> --- a/src/libcamera/message.cpp
> +++ b/src/libcamera/message.cpp
> @@ -49,6 +49,8 @@ std::atomic_uint Message::nextUserType_{ Message::UserMessage };
>   * \brief Asynchronous method invocation across threads
>   * \var Message::ThreadMoveMessage
>   * \brief Object is being moved to a different thread
> + * \var Message::CatchAll
> + * \brief Helper to match message of any type
>   * \var Message::UserMessage
>   * \brief First value available for user-defined messages
>   */
> diff --git a/src/libcamera/thread.cpp b/src/libcamera/thread.cpp
> index d1750d7..d3a5f81 100644
> --- a/src/libcamera/thread.cpp
> +++ b/src/libcamera/thread.cpp
> @@ -552,26 +552,47 @@ void Thread::removeMessages(Object *receiver)
>  }
>  
>  /**
> - * \brief Dispatch all posted messages for this thread
> + * \brief Dispatch posted messages for this thread as per \a type Message::Type.
> + *  Pass Message::Type::CatchAll to dispatch every posted message posted for
> + *  this thread.

Let's keep the brief brief :-) You're also missing the parameter
description.

 * \brief Dispatch posted messages for this thread
 * \param[in] type The message type
 *
 * This function immediately dispatches all the messages previously posted for
 * this thread with postMessage() that match the message \a type. If the \a type
 * is Message::Type::None, all messages are dispatched.

>   */
> -void Thread::dispatchMessages()
> +void Thread::dispatchMessages(Message::Type type)
>  {
>  	MutexLocker locker(data_->messages_.mutex_);
>  
> -	while (!data_->messages_.list_.empty()) {
> -		std::unique_ptr<Message> msg = std::move(data_->messages_.list_.front());
> -		data_->messages_.list_.pop_front();
> -		if (!msg)
> -			continue;
> +	if (type == Message::Type::CatchAll) {
> +		while (!data_->messages_.list_.empty()) {
> +			std::unique_ptr<Message> msg =
> +				std::move(data_->messages_.list_.front());
> +			data_->messages_.list_.pop_front();
> +			if (!msg)
> +				continue;
>  
> -		Object *receiver = msg->receiver_;
> -		ASSERT(data_ == receiver->thread()->data_);
> +			Object *receiver = msg->receiver_;
> +			ASSERT(data_ == receiver->thread()->data_);
>  
> -		receiver->pendingMessages_--;
> +			receiver->pendingMessages_--;
>  
> -		locker.unlock();
> -		receiver->message(msg.get());
> -		locker.lock();
> +			locker.unlock();
> +			receiver->message(msg.get());
> +			locker.lock();
> +		}
> +	} else {
> +		for (std::unique_ptr<Message> &msg : data_->messages_.list_) {
> +			if (!msg)
> +				continue;
> +
> +			if (msg->type() == type) {
> +				std::unique_ptr<Message> message = std::move(msg);

You need to erase the list entry here, moving it to a local variable is
fine from the point of view of deleting the message itself, but the list
entry needs to be removed too. To do so, you can use std::list::erase(),
but that requires an iterator, so the loop needs to be turned into a
regular for loop.

> +				Object *receiver = message->receiver_;
> +				ASSERT(data_ == receiver->thread()->data_);
> +				receiver->pendingMessages_--;
> +
> +				locker.unlock();
> +				receiver->message(message.get());

As an optimization, you can free the message here, as that's not an
operation that needs to be protected by the lock.

> +				locker.lock();
> +			}
> +		}
>  	}

I think you can combine both case.

	std::list<std::unique_ptr<Message>> &messages = data_->messages_.list_;

	for (auto iter = messages.begin(); iter != messages.end(); ) {
		std::unique_ptr<Message> &msg = *iter;

		if (!msg || (type != Message::Type::None && msg->type() != type)) {
			++iter;
			continue;
		}

		std::unique_ptr<Message> message = std::move(msg);
		iter = data_->messages_.list_.erase(iter);

		Object *receiver = message->receiver_;
		ASSERT(data_ == receiver->thread()->data_);
		receiver->pendingMessages_--;

		locker.unlock();
		receiver->message(message.get());
		message.reset();
		locker.lock();
	}

>  }
>  

-- 
Regards,

Laurent Pinchart


More information about the libcamera-devel mailing list