[libcamera-devel] [PATCH 2/6] libcamera: thread: Add a messaging passing API

Niklas Söderlund niklas.soderlund at ragnatech.se
Thu Jul 11 07:24:51 CEST 2019


Hi Laurent,

Thanks for your patch.

On 2019-07-10 22:17:04 +0300, Laurent Pinchart wrote:
> Create a new Message class to model a message that can be passed to an
> object living in another thread. Only an invalid message type is
> currently defined, more messages will be added in the future.
> 
> The Thread class is extended with a messages queue, and the Object class
> with thread affinity.
> 
> Signed-off-by: Laurent Pinchart <laurent.pinchart at ideasonboard.com>
> ---
>  include/libcamera/object.h      |  13 +++
>  src/libcamera/include/message.h |  37 ++++++++
>  src/libcamera/include/thread.h  |   9 ++
>  src/libcamera/meson.build       |   2 +
>  src/libcamera/message.cpp       |  71 +++++++++++++++
>  src/libcamera/object.cpp        |  77 ++++++++++++++++-
>  src/libcamera/thread.cpp        | 147 +++++++++++++++++++++++++++++++-
>  7 files changed, 354 insertions(+), 2 deletions(-)
>  create mode 100644 src/libcamera/include/message.h
>  create mode 100644 src/libcamera/message.cpp
> 
> diff --git a/include/libcamera/object.h b/include/libcamera/object.h
> index eadd41f9a41f..d61dfb1ebaef 100644
> --- a/include/libcamera/object.h
> +++ b/include/libcamera/object.h
> @@ -8,26 +8,39 @@
>  #define __LIBCAMERA_OBJECT_H__
>  
>  #include <list>
> +#include <memory>
>  
>  namespace libcamera {
>  
> +class Message;
>  class SignalBase;
>  template<typename... Args>
>  class Signal;
> +class Thread;
>  
>  class Object
>  {
>  public:
> +	Object();
>  	virtual ~Object();
>  
> +	void postMessage(std::unique_ptr<Message> msg);
> +	virtual void message(Message *msg);
> +
> +	Thread *thread() const { return thread_; }
> +	void moveToThread(Thread *thread);
> +
>  private:
>  	template<typename... Args>
>  	friend class Signal;
> +	friend class Thread;
>  
>  	void connect(SignalBase *signal);
>  	void disconnect(SignalBase *signal);
>  
> +	Thread *thread_;
>  	std::list<SignalBase *> signals_;
> +	unsigned int pendingMessages_;
>  };
>  
>  }; /* namespace libcamera */
> diff --git a/src/libcamera/include/message.h b/src/libcamera/include/message.h
> new file mode 100644
> index 000000000000..97c9b80ec0e0
> --- /dev/null
> +++ b/src/libcamera/include/message.h
> @@ -0,0 +1,37 @@
> +/* SPDX-License-Identifier: LGPL-2.1-or-later */
> +/*
> + * Copyright (C) 2019, Google Inc.
> + *
> + * message.h - Message queue support
> + */
> +#ifndef __LIBCAMERA_MESSAGE_H__
> +#define __LIBCAMERA_MESSAGE_H__
> +
> +namespace libcamera {
> +
> +class Object;
> +class Thread;
> +
> +class Message
> +{
> +public:
> +	enum Type {
> +		None = 0,
> +	};
> +
> +	Message(Type type);
> +	virtual ~Message();
> +
> +	Type type() const { return type_; }
> +	Object *receiver() const { return receiver_; }
> +
> +private:
> +	friend class Thread;
> +
> +	Type type_;
> +	Object *receiver_;
> +};
> +
> +} /* namespace libcamera */
> +
> +#endif /* __LIBCAMERA_MESSAGE_H__ */
> diff --git a/src/libcamera/include/thread.h b/src/libcamera/include/thread.h
> index e881d90e9367..acae91cb6457 100644
> --- a/src/libcamera/include/thread.h
> +++ b/src/libcamera/include/thread.h
> @@ -16,6 +16,8 @@
>  namespace libcamera {
>  
>  class EventDispatcher;
> +class Message;
> +class Object;
>  class ThreadData;
>  class ThreadMain;
>  
> @@ -49,9 +51,16 @@ private:
>  	void startThread();
>  	void finishThread();
>  
> +	void postMessage(std::unique_ptr<Message> msg, Object *receiver);
> +	void removeMessages(Object *receiver);
> +	void dispatchMessages();
> +
> +	friend class Object;
>  	friend class ThreadData;
>  	friend class ThreadMain;
>  
> +	void moveObject(Object *object);
> +
>  	std::thread thread_;
>  	ThreadData *data_;
>  };
> diff --git a/src/libcamera/meson.build b/src/libcamera/meson.build
> index bf71524f768c..3e5097a4cdc7 100644
> --- a/src/libcamera/meson.build
> +++ b/src/libcamera/meson.build
> @@ -18,6 +18,7 @@ libcamera_sources = files([
>      'log.cpp',
>      'media_device.cpp',
>      'media_object.cpp',
> +    'message.cpp',
>      'object.cpp',
>      'pipeline_handler.cpp',
>      'request.cpp',
> @@ -45,6 +46,7 @@ libcamera_headers = files([
>      'include/log.h',
>      'include/media_device.h',
>      'include/media_object.h',
> +    'include/message.h',
>      'include/pipeline_handler.h',
>      'include/thread.h',
>      'include/utils.h',
> diff --git a/src/libcamera/message.cpp b/src/libcamera/message.cpp
> new file mode 100644
> index 000000000000..47caf44dc82d
> --- /dev/null
> +++ b/src/libcamera/message.cpp
> @@ -0,0 +1,71 @@
> +/* SPDX-License-Identifier: LGPL-2.1-or-later */
> +/*
> + * Copyright (C) 2019, Google Inc.
> + *
> + * message.cpp - Message queue support
> + */
> +
> +#include "message.h"
> +
> +#include "log.h"
> +
> +/**
> + * \file message.h
> + * \brief Message queue support
> + *
> + * The messaging API enables inter-thread communication through message
> + * posting. Messages can be sent from any thread to any recipient deriving from
> + * the Object class.
> + *
> + * The post a message, the sender allocates it dynamically as instance of a

s/The/To/

Reviewed-by: Niklas Söderlund <niklas.soderlund at ragnatech.se>

> + * class derived from Message. It then posts the message to an Object recipient
> + * through Object::postMessage(). Message ownership is passed to the object,
> + * the message shall thus not store any temporary data.
> + *
> + * The message is delivered in the context of the object's thread, through the
> + * Object::message() virtual method. After delivery the message is
> + * automatically deleted.
> + */
> +
> +namespace libcamera {
> +
> +LOG_DEFINE_CATEGORY(Message)
> +
> +/**
> + * \class Message
> + * \brief A message that can be posted to a Thread
> + */
> +
> +/**
> + * \enum Message::Type
> + * \brief The message type
> + * \var Message::None
> + * \brief Invalid message type
> + */
> +
> +/**
> + * \brief Construct a message object of type \a type
> + * \param[in] type The message type
> + */
> +Message::Message(Message::Type type)
> +	: type_(type)
> +{
> +}
> +
> +Message::~Message()
> +{
> +}
> +
> +/**
> + * \fn Message::type()
> + * \brief Retrieve the message type
> + * \return The message type
> + */
> +
> +/**
> + * \fn Message::receiver()
> + * \brief Retrieve the message receiver
> + * \return The message receiver
> + */
> +
> +}; /* namespace libcamera */
> diff --git a/src/libcamera/object.cpp b/src/libcamera/object.cpp
> index a504ca2c9daf..695e6c11b3a4 100644
> --- a/src/libcamera/object.cpp
> +++ b/src/libcamera/object.cpp
> @@ -9,6 +9,9 @@
>  
>  #include <libcamera/signal.h>
>  
> +#include "log.h"
> +#include "thread.h"
> +
>  /**
>   * \file object.h
>   * \brief Base object to support automatic signal disconnection
> @@ -24,13 +27,85 @@ namespace libcamera {
>   * slots. By inheriting from Object, an object is automatically disconnected
>   * from all connected signals when it gets destroyed.
>   *
> - * \sa Signal
> + * Object instance are bound to the thread in which they're created. When a
> + * message is posted to an object, its handler will run in the object's thread.
> + * This allows implementing easy message passing between threads by inheriting
> + * from the Object class.
> + *
> + * \sa Message, Signal, Thread
>   */
>  
> +Object::Object()
> +	: pendingMessages_(0)
> +{
> +	thread_ = Thread::current();
> +}
> +
>  Object::~Object()
>  {
>  	for (SignalBase *signal : signals_)
>  		signal->disconnect(this);
> +
> +	if (pendingMessages_)
> +		thread()->removeMessages(this);
> +}
> +
> +/**
> + * \brief Post a message to the object's thread
> + * \param[in] msg The message
> + *
> + * This method posts the message \a msg to the message queue of the object's
> + * thread, to be delivered to the object through the message() method in the
> + * context of its thread. Message ownership is passed to the thread, and the
> + * message will be deleted after being delivered.
> + *
> + * Messages are delivered through the thread's event loop. If the thread is not
> + * running its event loop the message will not be delivered until the event
> + * loop gets started.
> + */
> +void Object::postMessage(std::unique_ptr<Message> msg)
> +{
> +	thread()->postMessage(std::move(msg), this);
> +}
> +
> +/**
> + * \brief Message handler for the object
> + * \param[in] msg The message
> + *
> + * This virtual method receives messages for the object. It is called in the
> + * context of the object's thread, and can be overridden to process custom
> + * messages. The parent QObject::message() method shall be called for any
> + * message not handled by the override method.
> + *
> + * The message \a msg is valid only for the duration of the call, no reference
> + * to it shall be kept after this method returns.
> + */
> +void Object::message(Message *msg)
> +{
> +}
> +
> +/**
> + * \fn Object::thread()
> + * \brief Retrieve the thread the object is bound to
> + * \return The thread the object is bound to
> + */
> +
> +/**
> + * \brief Move the object to a different thread
> + * \param[in] thread The target thread
> + *
> + * This method moves the object from the current thread to the new \a thread.
> + * It shall be called from the thread in which the object currently lives,
> + * otherwise the behaviour is undefined.
> + */
> +void Object::moveToThread(Thread *thread)
> +{
> +	ASSERT(Thread::current() == thread_);
> +
> +	if (thread_ == thread)
> +		return;
> +
> +	thread->moveObject(this);
>  }
>  
>  void Object::connect(SignalBase *signal)
> diff --git a/src/libcamera/thread.cpp b/src/libcamera/thread.cpp
> index 95636ecaab53..5d46eeb8d3a5 100644
> --- a/src/libcamera/thread.cpp
> +++ b/src/libcamera/thread.cpp
> @@ -8,11 +8,13 @@
>  #include "thread.h"
>  
>  #include <atomic>
> +#include <list>
>  
>  #include <libcamera/event_dispatcher.h>
>  
>  #include "event_dispatcher_poll.h"
>  #include "log.h"
> +#include "message.h"
>  
>  /**
>   * \file thread.h
> @@ -25,6 +27,22 @@ LOG_DEFINE_CATEGORY(Thread)
>  
>  class ThreadMain;
>  
> +/**
> + * \brief A queue of posted messages
> + */
> +class MessageQueue
> +{
> +public:
> +	/**
> +	 * \brief List of queued Message instances
> +	 */
> +	std::list<std::unique_ptr<Message>> list_;
> +	/**
> +	 * \brief Protects the \ref list_
> +	 */
> +	Mutex mutex_;
> +};
> +
>  /**
>   * \brief Thread-local internal data
>   */
> @@ -51,6 +69,8 @@ private:
>  
>  	std::atomic<bool> exit_;
>  	int exitCode_;
> +
> +	MessageQueue messages_;
>  };
>  
>  /**
> @@ -192,8 +212,10 @@ int Thread::exec()
>  
>  	locker.unlock();
>  
> -	while (!data_->exit_.load(std::memory_order_acquire))
> +	while (!data_->exit_.load(std::memory_order_acquire)) {
> +		dispatchMessages();
>  		dispatcher->processEvents();
> +	}
>  
>  	locker.lock();
>  
> @@ -332,4 +354,127 @@ EventDispatcher *Thread::eventDispatcher()
>  	return data_->dispatcher_.load(std::memory_order_relaxed);
>  }
>  
> +/**
> + * \brief Post a message to the thread for the \a receiver
> + * \param[in] msg The message
> + * \param[in] receiver The receiver
> + *
> + * This method stores the message \a msg in the message queue of the thread for
> + * the \a receiver and wake up the thread's event loop. Message ownership is
> + * passed to the thread, and the message will be deleted after being delivered.
> + *
> + * Messages are delivered through the thread's event loop. If the thread is not
> + * running its event loop the message will not be delivered until the event
> + * loop gets started.
> + *
> + * If the \a receiver is not bound to this thread the behaviour is undefined.
> + *
> + * \sa exec()
> + */
> +void Thread::postMessage(std::unique_ptr<Message> msg, Object *receiver)
> +{
> +	msg->receiver_ = receiver;
> +
> +	ASSERT(data_ == receiver->thread()->data_);
> +
> +	MutexLocker locker(data_->messages_.mutex_);
> +	data_->messages_.list_.push_back(std::move(msg));
> +	receiver->pendingMessages_++;
> +	locker.unlock();
> +
> +	EventDispatcher *dispatcher =
> +		data_->dispatcher_.load(std::memory_order_acquire);
> +	if (dispatcher)
> +		dispatcher->interrupt();
> +}
> +
> +/**
> + * \brief Remove all posted messages for the \a receiver
> + * \param[in] receiver The receiver
> + *
> + * If the \a receiver is not bound to this thread the behaviour is undefined.
> + */
> +void Thread::removeMessages(Object *receiver)
> +{
> +	ASSERT(data_ == receiver->thread()->data_);
> +
> +	MutexLocker locker(data_->messages_.mutex_);
> +	if (!receiver->pendingMessages_)
> +		return;
> +
> +	std::vector<std::unique_ptr<Message>> toDelete;
> +	for (std::unique_ptr<Message> &msg : data_->messages_.list_) {
> +		if (!msg)
> +			continue;
> +		if (msg->receiver_ != receiver)
> +			continue;
> +
> +		/*
> +		 * Move the message to the pending deletion list to delete it
> +		 * after releasing the lock. The messages list element will
> +		 * contain a null pointer, and will be removed when dispatching
> +		 * messages.
> +		 */
> +		toDelete.push_back(std::move(msg));
> +		receiver->pendingMessages_--;
> +	}
> +
> +	ASSERT(!receiver->pendingMessages_);
> +	locker.unlock();
> +
> +	toDelete.clear();
> +}
> +
> +/**
> + * \brief Dispatch all posted messages for this thread
> + */
> +void Thread::dispatchMessages()
> +{
> +	MutexLocker locker(data_->messages_.mutex_);
> +
> +	while (!data_->messages_.list_.empty()) {
> +		std::unique_ptr<Message> msg = std::move(data_->messages_.list_.front());
> +		data_->messages_.list_.pop_front();
> +		if (!msg)
> +			continue;
> +
> +		Object *receiver = msg->receiver_;
> +		ASSERT(data_ == receiver->thread()->data_);
> +
> +		locker.unlock();
> +		receiver->message(msg.get());
> +		locker.lock();
> +
> +		receiver->pendingMessages_--;
> +	}
> +}
> +
> +/**
> + * \brief Move an \a object to the thread
> + * \param[in] object The object
> + */
> +void Thread::moveObject(Object *object)
> +{
> +	ThreadData *currentData = object->thread_->data_;
> +	ThreadData *targetData = data_;
> +
> +	MutexLocker lockerFrom(currentData->mutex_, std::defer_lock);
> +	MutexLocker lockerTo(targetData->mutex_, std::defer_lock);
> +	std::lock(lockerFrom, lockerTo);
> +
> +	/* Move pending messages to the message queue of the new thread. */
> +	if (object->pendingMessages_) {
> +		for (std::unique_ptr<Message> &msg : currentData->messages_.list_) {
> +			if (!msg)
> +				continue;
> +			if (msg->receiver_ != object)
> +				continue;
> +
> +			targetData->messages_.list_.push_back(std::move(msg));
> +		}
> +	}
> +
> +	object->thread_ = this;
> +}
> +
>  }; /* namespace libcamera */
> -- 
> Regards,
> 
> Laurent Pinchart
> 
> _______________________________________________
> libcamera-devel mailing list
> libcamera-devel at lists.libcamera.org
> https://lists.libcamera.org/listinfo/libcamera-devel

-- 
Regards,
Niklas Söderlund


More information about the libcamera-devel mailing list