[libcamera-devel] [PATCH 2/6] libcamera: thread: Add a messaging passing API
Niklas Söderlund
niklas.soderlund at ragnatech.se
Thu Jul 11 07:24:51 CEST 2019
Hi Laurent,
Thanks for your patch.
On 2019-07-10 22:17:04 +0300, Laurent Pinchart wrote:
> Create a new Message class to model a message that can be passed to an
> object living in another thread. Only an invalid message type is
> currently defined, more messages will be added in the future.
>
> The Thread class is extended with a messages queue, and the Object class
> with thread affinity.
>
> Signed-off-by: Laurent Pinchart <laurent.pinchart at ideasonboard.com>
> ---
> include/libcamera/object.h | 13 +++
> src/libcamera/include/message.h | 37 ++++++++
> src/libcamera/include/thread.h | 9 ++
> src/libcamera/meson.build | 2 +
> src/libcamera/message.cpp | 71 +++++++++++++++
> src/libcamera/object.cpp | 77 ++++++++++++++++-
> src/libcamera/thread.cpp | 147 +++++++++++++++++++++++++++++++-
> 7 files changed, 354 insertions(+), 2 deletions(-)
> create mode 100644 src/libcamera/include/message.h
> create mode 100644 src/libcamera/message.cpp
>
> diff --git a/include/libcamera/object.h b/include/libcamera/object.h
> index eadd41f9a41f..d61dfb1ebaef 100644
> --- a/include/libcamera/object.h
> +++ b/include/libcamera/object.h
> @@ -8,26 +8,39 @@
> #define __LIBCAMERA_OBJECT_H__
>
> #include <list>
> +#include <memory>
>
> namespace libcamera {
>
> +class Message;
> class SignalBase;
> template<typename... Args>
> class Signal;
> +class Thread;
>
> class Object
> {
> public:
> + Object();
> virtual ~Object();
>
> + void postMessage(std::unique_ptr<Message> msg);
> + virtual void message(Message *msg);
> +
> + Thread *thread() const { return thread_; }
> + void moveToThread(Thread *thread);
> +
> private:
> template<typename... Args>
> friend class Signal;
> + friend class Thread;
>
> void connect(SignalBase *signal);
> void disconnect(SignalBase *signal);
>
> + Thread *thread_;
> std::list<SignalBase *> signals_;
> + unsigned int pendingMessages_;
> };
>
> }; /* namespace libcamera */
> diff --git a/src/libcamera/include/message.h b/src/libcamera/include/message.h
> new file mode 100644
> index 000000000000..97c9b80ec0e0
> --- /dev/null
> +++ b/src/libcamera/include/message.h
> @@ -0,0 +1,37 @@
> +/* SPDX-License-Identifier: LGPL-2.1-or-later */
> +/*
> + * Copyright (C) 2019, Google Inc.
> + *
> + * message.h - Message queue support
> + */
> +#ifndef __LIBCAMERA_MESSAGE_H__
> +#define __LIBCAMERA_MESSAGE_H__
> +
> +namespace libcamera {
> +
> +class Object;
> +class Thread;
> +
> +class Message
> +{
> +public:
> + enum Type {
> + None = 0,
> + };
> +
> + Message(Type type);
> + virtual ~Message();
> +
> + Type type() const { return type_; }
> + Object *receiver() const { return receiver_; }
> +
> +private:
> + friend class Thread;
> +
> + Type type_;
> + Object *receiver_;
> +};
> +
> +} /* namespace libcamera */
> +
> +#endif /* __LIBCAMERA_MESSAGE_H__ */
> diff --git a/src/libcamera/include/thread.h b/src/libcamera/include/thread.h
> index e881d90e9367..acae91cb6457 100644
> --- a/src/libcamera/include/thread.h
> +++ b/src/libcamera/include/thread.h
> @@ -16,6 +16,8 @@
> namespace libcamera {
>
> class EventDispatcher;
> +class Message;
> +class Object;
> class ThreadData;
> class ThreadMain;
>
> @@ -49,9 +51,16 @@ private:
> void startThread();
> void finishThread();
>
> + void postMessage(std::unique_ptr<Message> msg, Object *receiver);
> + void removeMessages(Object *receiver);
> + void dispatchMessages();
> +
> + friend class Object;
> friend class ThreadData;
> friend class ThreadMain;
>
> + void moveObject(Object *object);
> +
> std::thread thread_;
> ThreadData *data_;
> };
> diff --git a/src/libcamera/meson.build b/src/libcamera/meson.build
> index bf71524f768c..3e5097a4cdc7 100644
> --- a/src/libcamera/meson.build
> +++ b/src/libcamera/meson.build
> @@ -18,6 +18,7 @@ libcamera_sources = files([
> 'log.cpp',
> 'media_device.cpp',
> 'media_object.cpp',
> + 'message.cpp',
> 'object.cpp',
> 'pipeline_handler.cpp',
> 'request.cpp',
> @@ -45,6 +46,7 @@ libcamera_headers = files([
> 'include/log.h',
> 'include/media_device.h',
> 'include/media_object.h',
> + 'include/message.h',
> 'include/pipeline_handler.h',
> 'include/thread.h',
> 'include/utils.h',
> diff --git a/src/libcamera/message.cpp b/src/libcamera/message.cpp
> new file mode 100644
> index 000000000000..47caf44dc82d
> --- /dev/null
> +++ b/src/libcamera/message.cpp
> @@ -0,0 +1,71 @@
> +/* SPDX-License-Identifier: LGPL-2.1-or-later */
> +/*
> + * Copyright (C) 2019, Google Inc.
> + *
> + * message.cpp - Message queue support
> + */
> +
> +#include "message.h"
> +
> +#include "log.h"
> +
> +/**
> + * \file message.h
> + * \brief Message queue support
> + *
> + * The messaging API enables inter-thread communication through message
> + * posting. Messages can be sent from any thread to any recipient deriving from
> + * the Object class.
> + *
> + * The post a message, the sender allocates it dynamically as instance of a
s/The/To/
Reviewed-by: Niklas Söderlund <niklas.soderlund at ragnatech.se>
> + * class derived from Message. It then posts the message to an Object recipient
> + * through Object::postMessage(). Message ownership is passed to the object,
> + * the message shall thus not store any temporary data.
> + *
> + * The message is delivered in the context of the object's thread, through the
> + * Object::message() virtual method. After delivery the message is
> + * automatically deleted.
> + */
> +
> +namespace libcamera {
> +
> +LOG_DEFINE_CATEGORY(Message)
> +
> +/**
> + * \class Message
> + * \brief A message that can be posted to a Thread
> + */
> +
> +/**
> + * \enum Message::Type
> + * \brief The message type
> + * \var Message::None
> + * \brief Invalid message type
> + */
> +
> +/**
> + * \brief Construct a message object of type \a type
> + * \param[in] type The message type
> + */
> +Message::Message(Message::Type type)
> + : type_(type)
> +{
> +}
> +
> +Message::~Message()
> +{
> +}
> +
> +/**
> + * \fn Message::type()
> + * \brief Retrieve the message type
> + * \return The message type
> + */
> +
> +/**
> + * \fn Message::receiver()
> + * \brief Retrieve the message receiver
> + * \return The message receiver
> + */
> +
> +}; /* namespace libcamera */
> diff --git a/src/libcamera/object.cpp b/src/libcamera/object.cpp
> index a504ca2c9daf..695e6c11b3a4 100644
> --- a/src/libcamera/object.cpp
> +++ b/src/libcamera/object.cpp
> @@ -9,6 +9,9 @@
>
> #include <libcamera/signal.h>
>
> +#include "log.h"
> +#include "thread.h"
> +
> /**
> * \file object.h
> * \brief Base object to support automatic signal disconnection
> @@ -24,13 +27,85 @@ namespace libcamera {
> * slots. By inheriting from Object, an object is automatically disconnected
> * from all connected signals when it gets destroyed.
> *
> - * \sa Signal
> + * Object instance are bound to the thread in which they're created. When a
> + * message is posted to an object, its handler will run in the object's thread.
> + * This allows implementing easy message passing between threads by inheriting
> + * from the Object class.
> + *
> + * \sa Message, Signal, Thread
> */
>
> +Object::Object()
> + : pendingMessages_(0)
> +{
> + thread_ = Thread::current();
> +}
> +
> Object::~Object()
> {
> for (SignalBase *signal : signals_)
> signal->disconnect(this);
> +
> + if (pendingMessages_)
> + thread()->removeMessages(this);
> +}
> +
> +/**
> + * \brief Post a message to the object's thread
> + * \param[in] msg The message
> + *
> + * This method posts the message \a msg to the message queue of the object's
> + * thread, to be delivered to the object through the message() method in the
> + * context of its thread. Message ownership is passed to the thread, and the
> + * message will be deleted after being delivered.
> + *
> + * Messages are delivered through the thread's event loop. If the thread is not
> + * running its event loop the message will not be delivered until the event
> + * loop gets started.
> + */
> +void Object::postMessage(std::unique_ptr<Message> msg)
> +{
> + thread()->postMessage(std::move(msg), this);
> +}
> +
> +/**
> + * \brief Message handler for the object
> + * \param[in] msg The message
> + *
> + * This virtual method receives messages for the object. It is called in the
> + * context of the object's thread, and can be overridden to process custom
> + * messages. The parent QObject::message() method shall be called for any
> + * message not handled by the override method.
> + *
> + * The message \a msg is valid only for the duration of the call, no reference
> + * to it shall be kept after this method returns.
> + */
> +void Object::message(Message *msg)
> +{
> +}
> +
> +/**
> + * \fn Object::thread()
> + * \brief Retrieve the thread the object is bound to
> + * \return The thread the object is bound to
> + */
> +
> +/**
> + * \brief Move the object to a different thread
> + * \param[in] thread The target thread
> + *
> + * This method moves the object from the current thread to the new \a thread.
> + * It shall be called from the thread in which the object currently lives,
> + * otherwise the behaviour is undefined.
> + */
> +void Object::moveToThread(Thread *thread)
> +{
> + ASSERT(Thread::current() == thread_);
> +
> + if (thread_ == thread)
> + return;
> +
> + thread->moveObject(this);
> }
>
> void Object::connect(SignalBase *signal)
> diff --git a/src/libcamera/thread.cpp b/src/libcamera/thread.cpp
> index 95636ecaab53..5d46eeb8d3a5 100644
> --- a/src/libcamera/thread.cpp
> +++ b/src/libcamera/thread.cpp
> @@ -8,11 +8,13 @@
> #include "thread.h"
>
> #include <atomic>
> +#include <list>
>
> #include <libcamera/event_dispatcher.h>
>
> #include "event_dispatcher_poll.h"
> #include "log.h"
> +#include "message.h"
>
> /**
> * \file thread.h
> @@ -25,6 +27,22 @@ LOG_DEFINE_CATEGORY(Thread)
>
> class ThreadMain;
>
> +/**
> + * \brief A queue of posted messages
> + */
> +class MessageQueue
> +{
> +public:
> + /**
> + * \brief List of queued Message instances
> + */
> + std::list<std::unique_ptr<Message>> list_;
> + /**
> + * \brief Protects the \ref list_
> + */
> + Mutex mutex_;
> +};
> +
> /**
> * \brief Thread-local internal data
> */
> @@ -51,6 +69,8 @@ private:
>
> std::atomic<bool> exit_;
> int exitCode_;
> +
> + MessageQueue messages_;
> };
>
> /**
> @@ -192,8 +212,10 @@ int Thread::exec()
>
> locker.unlock();
>
> - while (!data_->exit_.load(std::memory_order_acquire))
> + while (!data_->exit_.load(std::memory_order_acquire)) {
> + dispatchMessages();
> dispatcher->processEvents();
> + }
>
> locker.lock();
>
> @@ -332,4 +354,127 @@ EventDispatcher *Thread::eventDispatcher()
> return data_->dispatcher_.load(std::memory_order_relaxed);
> }
>
> +/**
> + * \brief Post a message to the thread for the \a receiver
> + * \param[in] msg The message
> + * \param[in] receiver The receiver
> + *
> + * This method stores the message \a msg in the message queue of the thread for
> + * the \a receiver and wake up the thread's event loop. Message ownership is
> + * passed to the thread, and the message will be deleted after being delivered.
> + *
> + * Messages are delivered through the thread's event loop. If the thread is not
> + * running its event loop the message will not be delivered until the event
> + * loop gets started.
> + *
> + * If the \a receiver is not bound to this thread the behaviour is undefined.
> + *
> + * \sa exec()
> + */
> +void Thread::postMessage(std::unique_ptr<Message> msg, Object *receiver)
> +{
> + msg->receiver_ = receiver;
> +
> + ASSERT(data_ == receiver->thread()->data_);
> +
> + MutexLocker locker(data_->messages_.mutex_);
> + data_->messages_.list_.push_back(std::move(msg));
> + receiver->pendingMessages_++;
> + locker.unlock();
> +
> + EventDispatcher *dispatcher =
> + data_->dispatcher_.load(std::memory_order_acquire);
> + if (dispatcher)
> + dispatcher->interrupt();
> +}
> +
> +/**
> + * \brief Remove all posted messages for the \a receiver
> + * \param[in] receiver The receiver
> + *
> + * If the \a receiver is not bound to this thread the behaviour is undefined.
> + */
> +void Thread::removeMessages(Object *receiver)
> +{
> + ASSERT(data_ == receiver->thread()->data_);
> +
> + MutexLocker locker(data_->messages_.mutex_);
> + if (!receiver->pendingMessages_)
> + return;
> +
> + std::vector<std::unique_ptr<Message>> toDelete;
> + for (std::unique_ptr<Message> &msg : data_->messages_.list_) {
> + if (!msg)
> + continue;
> + if (msg->receiver_ != receiver)
> + continue;
> +
> + /*
> + * Move the message to the pending deletion list to delete it
> + * after releasing the lock. The messages list element will
> + * contain a null pointer, and will be removed when dispatching
> + * messages.
> + */
> + toDelete.push_back(std::move(msg));
> + receiver->pendingMessages_--;
> + }
> +
> + ASSERT(!receiver->pendingMessages_);
> + locker.unlock();
> +
> + toDelete.clear();
> +}
> +
> +/**
> + * \brief Dispatch all posted messages for this thread
> + */
> +void Thread::dispatchMessages()
> +{
> + MutexLocker locker(data_->messages_.mutex_);
> +
> + while (!data_->messages_.list_.empty()) {
> + std::unique_ptr<Message> msg = std::move(data_->messages_.list_.front());
> + data_->messages_.list_.pop_front();
> + if (!msg)
> + continue;
> +
> + Object *receiver = msg->receiver_;
> + ASSERT(data_ == receiver->thread()->data_);
> +
> + locker.unlock();
> + receiver->message(msg.get());
> + locker.lock();
> +
> + receiver->pendingMessages_--;
> + }
> +}
> +
> +/**
> + * \brief Move an \a object to the thread
> + * \param[in] object The object
> + */
> +void Thread::moveObject(Object *object)
> +{
> + ThreadData *currentData = object->thread_->data_;
> + ThreadData *targetData = data_;
> +
> + MutexLocker lockerFrom(currentData->mutex_, std::defer_lock);
> + MutexLocker lockerTo(targetData->mutex_, std::defer_lock);
> + std::lock(lockerFrom, lockerTo);
> +
> + /* Move pending messages to the message queue of the new thread. */
> + if (object->pendingMessages_) {
> + for (std::unique_ptr<Message> &msg : currentData->messages_.list_) {
> + if (!msg)
> + continue;
> + if (msg->receiver_ != object)
> + continue;
> +
> + targetData->messages_.list_.push_back(std::move(msg));
> + }
> + }
> +
> + object->thread_ = this;
> +}
> +
> }; /* namespace libcamera */
> --
> Regards,
>
> Laurent Pinchart
>
> _______________________________________________
> libcamera-devel mailing list
> libcamera-devel at lists.libcamera.org
> https://lists.libcamera.org/listinfo/libcamera-devel
--
Regards,
Niklas Söderlund
More information about the libcamera-devel
mailing list