[libcamera-devel] [PATCH 2/6] libcamera: thread: Add a messaging passing API

Laurent Pinchart laurent.pinchart at ideasonboard.com
Wed Jul 10 21:17:04 CEST 2019


Create a new Message class to model a message that can be passed to an
object living in another thread. Only an invalid message type is
currently defined, more messages will be added in the future.

The Thread class is extended with a messages queue, and the Object class
with thread affinity.

Signed-off-by: Laurent Pinchart <laurent.pinchart at ideasonboard.com>
---
 include/libcamera/object.h      |  13 +++
 src/libcamera/include/message.h |  37 ++++++++
 src/libcamera/include/thread.h  |   9 ++
 src/libcamera/meson.build       |   2 +
 src/libcamera/message.cpp       |  71 +++++++++++++++
 src/libcamera/object.cpp        |  77 ++++++++++++++++-
 src/libcamera/thread.cpp        | 147 +++++++++++++++++++++++++++++++-
 7 files changed, 354 insertions(+), 2 deletions(-)
 create mode 100644 src/libcamera/include/message.h
 create mode 100644 src/libcamera/message.cpp

diff --git a/include/libcamera/object.h b/include/libcamera/object.h
index eadd41f9a41f..d61dfb1ebaef 100644
--- a/include/libcamera/object.h
+++ b/include/libcamera/object.h
@@ -8,26 +8,39 @@
 #define __LIBCAMERA_OBJECT_H__
 
 #include <list>
+#include <memory>
 
 namespace libcamera {
 
+class Message;
 class SignalBase;
 template<typename... Args>
 class Signal;
+class Thread;
 
 class Object
 {
 public:
+	Object();
 	virtual ~Object();
 
+	void postMessage(std::unique_ptr<Message> msg);
+	virtual void message(Message *msg);
+
+	Thread *thread() const { return thread_; }
+	void moveToThread(Thread *thread);
+
 private:
 	template<typename... Args>
 	friend class Signal;
+	friend class Thread;
 
 	void connect(SignalBase *signal);
 	void disconnect(SignalBase *signal);
 
+	Thread *thread_;
 	std::list<SignalBase *> signals_;
+	unsigned int pendingMessages_;
 };
 
 }; /* namespace libcamera */
diff --git a/src/libcamera/include/message.h b/src/libcamera/include/message.h
new file mode 100644
index 000000000000..97c9b80ec0e0
--- /dev/null
+++ b/src/libcamera/include/message.h
@@ -0,0 +1,37 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2019, Google Inc.
+ *
+ * message.h - Message queue support
+ */
+#ifndef __LIBCAMERA_MESSAGE_H__
+#define __LIBCAMERA_MESSAGE_H__
+
+namespace libcamera {
+
+class Object;
+class Thread;
+
+class Message
+{
+public:
+	enum Type {
+		None = 0,
+	};
+
+	Message(Type type);
+	virtual ~Message();
+
+	Type type() const { return type_; }
+	Object *receiver() const { return receiver_; }
+
+private:
+	friend class Thread;
+
+	Type type_;
+	Object *receiver_;
+};
+
+} /* namespace libcamera */
+
+#endif /* __LIBCAMERA_MESSAGE_H__ */
diff --git a/src/libcamera/include/thread.h b/src/libcamera/include/thread.h
index e881d90e9367..acae91cb6457 100644
--- a/src/libcamera/include/thread.h
+++ b/src/libcamera/include/thread.h
@@ -16,6 +16,8 @@
 namespace libcamera {
 
 class EventDispatcher;
+class Message;
+class Object;
 class ThreadData;
 class ThreadMain;
 
@@ -49,9 +51,16 @@ private:
 	void startThread();
 	void finishThread();
 
+	void postMessage(std::unique_ptr<Message> msg, Object *receiver);
+	void removeMessages(Object *receiver);
+	void dispatchMessages();
+
+	friend class Object;
 	friend class ThreadData;
 	friend class ThreadMain;
 
+	void moveObject(Object *object);
+
 	std::thread thread_;
 	ThreadData *data_;
 };
diff --git a/src/libcamera/meson.build b/src/libcamera/meson.build
index bf71524f768c..3e5097a4cdc7 100644
--- a/src/libcamera/meson.build
+++ b/src/libcamera/meson.build
@@ -18,6 +18,7 @@ libcamera_sources = files([
     'log.cpp',
     'media_device.cpp',
     'media_object.cpp',
+    'message.cpp',
     'object.cpp',
     'pipeline_handler.cpp',
     'request.cpp',
@@ -45,6 +46,7 @@ libcamera_headers = files([
     'include/log.h',
     'include/media_device.h',
     'include/media_object.h',
+    'include/message.h',
     'include/pipeline_handler.h',
     'include/thread.h',
     'include/utils.h',
diff --git a/src/libcamera/message.cpp b/src/libcamera/message.cpp
new file mode 100644
index 000000000000..47caf44dc82d
--- /dev/null
+++ b/src/libcamera/message.cpp
@@ -0,0 +1,71 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2019, Google Inc.
+ *
+ * message.cpp - Message queue support
+ */
+
+#include "message.h"
+
+#include "log.h"
+
+/**
+ * \file message.h
+ * \brief Message queue support
+ *
+ * The messaging API enables inter-thread communication through message
+ * posting. Messages can be sent from any thread to any recipient deriving from
+ * the Object class.
+ *
+ * The post a message, the sender allocates it dynamically as instance of a
+ * class derived from Message. It then posts the message to an Object recipient
+ * through Object::postMessage(). Message ownership is passed to the object,
+ * the message shall thus not store any temporary data.
+ *
+ * The message is delivered in the context of the object's thread, through the
+ * Object::message() virtual method. After delivery the message is
+ * automatically deleted.
+ */
+
+namespace libcamera {
+
+LOG_DEFINE_CATEGORY(Message)
+
+/**
+ * \class Message
+ * \brief A message that can be posted to a Thread
+ */
+
+/**
+ * \enum Message::Type
+ * \brief The message type
+ * \var Message::None
+ * \brief Invalid message type
+ */
+
+/**
+ * \brief Construct a message object of type \a type
+ * \param[in] type The message type
+ */
+Message::Message(Message::Type type)
+	: type_(type)
+{
+}
+
+Message::~Message()
+{
+}
+
+/**
+ * \fn Message::type()
+ * \brief Retrieve the message type
+ * \return The message type
+ */
+
+/**
+ * \fn Message::receiver()
+ * \brief Retrieve the message receiver
+ * \return The message receiver
+ */
+
+}; /* namespace libcamera */
diff --git a/src/libcamera/object.cpp b/src/libcamera/object.cpp
index a504ca2c9daf..695e6c11b3a4 100644
--- a/src/libcamera/object.cpp
+++ b/src/libcamera/object.cpp
@@ -9,6 +9,9 @@
 
 #include <libcamera/signal.h>
 
+#include "log.h"
+#include "thread.h"
+
 /**
  * \file object.h
  * \brief Base object to support automatic signal disconnection
@@ -24,13 +27,85 @@ namespace libcamera {
  * slots. By inheriting from Object, an object is automatically disconnected
  * from all connected signals when it gets destroyed.
  *
- * \sa Signal
+ * Object instance are bound to the thread in which they're created. When a
+ * message is posted to an object, its handler will run in the object's thread.
+ * This allows implementing easy message passing between threads by inheriting
+ * from the Object class.
+ *
+ * \sa Message, Signal, Thread
  */
 
+Object::Object()
+	: pendingMessages_(0)
+{
+	thread_ = Thread::current();
+}
+
 Object::~Object()
 {
 	for (SignalBase *signal : signals_)
 		signal->disconnect(this);
+
+	if (pendingMessages_)
+		thread()->removeMessages(this);
+}
+
+/**
+ * \brief Post a message to the object's thread
+ * \param[in] msg The message
+ *
+ * This method posts the message \a msg to the message queue of the object's
+ * thread, to be delivered to the object through the message() method in the
+ * context of its thread. Message ownership is passed to the thread, and the
+ * message will be deleted after being delivered.
+ *
+ * Messages are delivered through the thread's event loop. If the thread is not
+ * running its event loop the message will not be delivered until the event
+ * loop gets started.
+ */
+void Object::postMessage(std::unique_ptr<Message> msg)
+{
+	thread()->postMessage(std::move(msg), this);
+}
+
+/**
+ * \brief Message handler for the object
+ * \param[in] msg The message
+ *
+ * This virtual method receives messages for the object. It is called in the
+ * context of the object's thread, and can be overridden to process custom
+ * messages. The parent QObject::message() method shall be called for any
+ * message not handled by the override method.
+ *
+ * The message \a msg is valid only for the duration of the call, no reference
+ * to it shall be kept after this method returns.
+ */
+void Object::message(Message *msg)
+{
+}
+
+/**
+ * \fn Object::thread()
+ * \brief Retrieve the thread the object is bound to
+ * \return The thread the object is bound to
+ */
+
+/**
+ * \brief Move the object to a different thread
+ * \param[in] thread The target thread
+ *
+ * This method moves the object from the current thread to the new \a thread.
+ * It shall be called from the thread in which the object currently lives,
+ * otherwise the behaviour is undefined.
+ */
+void Object::moveToThread(Thread *thread)
+{
+	ASSERT(Thread::current() == thread_);
+
+	if (thread_ == thread)
+		return;
+
+	thread->moveObject(this);
 }
 
 void Object::connect(SignalBase *signal)
diff --git a/src/libcamera/thread.cpp b/src/libcamera/thread.cpp
index 95636ecaab53..5d46eeb8d3a5 100644
--- a/src/libcamera/thread.cpp
+++ b/src/libcamera/thread.cpp
@@ -8,11 +8,13 @@
 #include "thread.h"
 
 #include <atomic>
+#include <list>
 
 #include <libcamera/event_dispatcher.h>
 
 #include "event_dispatcher_poll.h"
 #include "log.h"
+#include "message.h"
 
 /**
  * \file thread.h
@@ -25,6 +27,22 @@ LOG_DEFINE_CATEGORY(Thread)
 
 class ThreadMain;
 
+/**
+ * \brief A queue of posted messages
+ */
+class MessageQueue
+{
+public:
+	/**
+	 * \brief List of queued Message instances
+	 */
+	std::list<std::unique_ptr<Message>> list_;
+	/**
+	 * \brief Protects the \ref list_
+	 */
+	Mutex mutex_;
+};
+
 /**
  * \brief Thread-local internal data
  */
@@ -51,6 +69,8 @@ private:
 
 	std::atomic<bool> exit_;
 	int exitCode_;
+
+	MessageQueue messages_;
 };
 
 /**
@@ -192,8 +212,10 @@ int Thread::exec()
 
 	locker.unlock();
 
-	while (!data_->exit_.load(std::memory_order_acquire))
+	while (!data_->exit_.load(std::memory_order_acquire)) {
+		dispatchMessages();
 		dispatcher->processEvents();
+	}
 
 	locker.lock();
 
@@ -332,4 +354,127 @@ EventDispatcher *Thread::eventDispatcher()
 	return data_->dispatcher_.load(std::memory_order_relaxed);
 }
 
+/**
+ * \brief Post a message to the thread for the \a receiver
+ * \param[in] msg The message
+ * \param[in] receiver The receiver
+ *
+ * This method stores the message \a msg in the message queue of the thread for
+ * the \a receiver and wake up the thread's event loop. Message ownership is
+ * passed to the thread, and the message will be deleted after being delivered.
+ *
+ * Messages are delivered through the thread's event loop. If the thread is not
+ * running its event loop the message will not be delivered until the event
+ * loop gets started.
+ *
+ * If the \a receiver is not bound to this thread the behaviour is undefined.
+ *
+ * \sa exec()
+ */
+void Thread::postMessage(std::unique_ptr<Message> msg, Object *receiver)
+{
+	msg->receiver_ = receiver;
+
+	ASSERT(data_ == receiver->thread()->data_);
+
+	MutexLocker locker(data_->messages_.mutex_);
+	data_->messages_.list_.push_back(std::move(msg));
+	receiver->pendingMessages_++;
+	locker.unlock();
+
+	EventDispatcher *dispatcher =
+		data_->dispatcher_.load(std::memory_order_acquire);
+	if (dispatcher)
+		dispatcher->interrupt();
+}
+
+/**
+ * \brief Remove all posted messages for the \a receiver
+ * \param[in] receiver The receiver
+ *
+ * If the \a receiver is not bound to this thread the behaviour is undefined.
+ */
+void Thread::removeMessages(Object *receiver)
+{
+	ASSERT(data_ == receiver->thread()->data_);
+
+	MutexLocker locker(data_->messages_.mutex_);
+	if (!receiver->pendingMessages_)
+		return;
+
+	std::vector<std::unique_ptr<Message>> toDelete;
+	for (std::unique_ptr<Message> &msg : data_->messages_.list_) {
+		if (!msg)
+			continue;
+		if (msg->receiver_ != receiver)
+			continue;
+
+		/*
+		 * Move the message to the pending deletion list to delete it
+		 * after releasing the lock. The messages list element will
+		 * contain a null pointer, and will be removed when dispatching
+		 * messages.
+		 */
+		toDelete.push_back(std::move(msg));
+		receiver->pendingMessages_--;
+	}
+
+	ASSERT(!receiver->pendingMessages_);
+	locker.unlock();
+
+	toDelete.clear();
+}
+
+/**
+ * \brief Dispatch all posted messages for this thread
+ */
+void Thread::dispatchMessages()
+{
+	MutexLocker locker(data_->messages_.mutex_);
+
+	while (!data_->messages_.list_.empty()) {
+		std::unique_ptr<Message> msg = std::move(data_->messages_.list_.front());
+		data_->messages_.list_.pop_front();
+		if (!msg)
+			continue;
+
+		Object *receiver = msg->receiver_;
+		ASSERT(data_ == receiver->thread()->data_);
+
+		locker.unlock();
+		receiver->message(msg.get());
+		locker.lock();
+
+		receiver->pendingMessages_--;
+	}
+}
+
+/**
+ * \brief Move an \a object to the thread
+ * \param[in] object The object
+ */
+void Thread::moveObject(Object *object)
+{
+	ThreadData *currentData = object->thread_->data_;
+	ThreadData *targetData = data_;
+
+	MutexLocker lockerFrom(currentData->mutex_, std::defer_lock);
+	MutexLocker lockerTo(targetData->mutex_, std::defer_lock);
+	std::lock(lockerFrom, lockerTo);
+
+	/* Move pending messages to the message queue of the new thread. */
+	if (object->pendingMessages_) {
+		for (std::unique_ptr<Message> &msg : currentData->messages_.list_) {
+			if (!msg)
+				continue;
+			if (msg->receiver_ != object)
+				continue;
+
+			targetData->messages_.list_.push_back(std::move(msg));
+		}
+	}
+
+	object->thread_ = this;
+}
+
 }; /* namespace libcamera */
-- 
Regards,

Laurent Pinchart



More information about the libcamera-devel mailing list