[PATCH v1 1/1] libcamera: Introduce scheduler and task for complex pipeline data flow

Harvey Yang chenghaoyang at chromium.org
Mon Aug 26 21:26:55 CEST 2024


The patch introduces scheduler and task to define the data flow and task
ordering for complex pipeline more easily. The scheduler schedules tasks
in a topological order from partial relation defined for each task when a
request comes in.

Signed-off-by: Han-Lin Chen <hanlinchen at chromium.org>
Co-developed-by: Harvey Yang <chenghaoyang at chromium.org>
---
 include/libcamera/internal/meson.build      |   1 +
 include/libcamera/internal/task_scheduler.h | 116 ++++++++
 src/libcamera/meson.build                   |   1 +
 src/libcamera/task_scheduler.cpp            | 301 ++++++++++++++++++++
 4 files changed, 419 insertions(+)
 create mode 100644 include/libcamera/internal/task_scheduler.h
 create mode 100644 src/libcamera/task_scheduler.cpp

diff --git a/include/libcamera/internal/meson.build b/include/libcamera/internal/meson.build
index 1c5eef9ca..62e49ba60 100644
--- a/include/libcamera/internal/meson.build
+++ b/include/libcamera/internal/meson.build
@@ -37,6 +37,7 @@ libcamera_internal_headers = files([
     'shared_mem_object.h',
     'source_paths.h',
     'sysfs.h',
+    'task_scheduler.h',
     'v4l2_device.h',
     'v4l2_pixelformat.h',
     'v4l2_subdevice.h',
diff --git a/include/libcamera/internal/task_scheduler.h b/include/libcamera/internal/task_scheduler.h
new file mode 100644
index 000000000..5a325b87b
--- /dev/null
+++ b/include/libcamera/internal/task_scheduler.h
@@ -0,0 +1,116 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2024, Google Inc.
+ *
+ * task_scheduler.h - A task scheduler
+ */
+
+#pragma once
+
+#include <chrono>
+#include <list>
+#include <map>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+
+#include <libcamera/base/object.h>
+#include <libcamera/base/timer.h>
+
+namespace libcamera {
+
+class Scheduler;
+
+class Task
+{
+public:
+	Task(Scheduler *scheduler, const std::string &id = "");
+	virtual ~Task() = default;
+
+	virtual void notifyDone();
+
+	virtual void run() = 0;
+	std::string &id() { return id_; }
+
+	bool isRunning() { return running_; }
+
+protected:
+	Scheduler *scheduler_;
+	std::string id_;
+
+private:
+	friend Scheduler;
+
+	bool running_ = false;
+
+	void depend(Task *task);
+	size_t removeDependency(Task *task);
+	void launch();
+
+	std::list<Task *> precedents_;
+	std::list<Task *> succedents_;
+
+	std::chrono::steady_clock::time_point launchTime_;
+};
+
+class Scheduler : public Object
+{
+public:
+	static void precede(Task *precedent, Task *task);
+
+	Scheduler();
+
+	void schedule();
+	void log();
+
+protected:
+	void queueTask(Task *task, int32_t group);
+	void succeedPrevTaskByStep(int32_t group, size_t step, Task *task);
+	std::list<Task *> &groupTasks(int32_t group);
+
+	std::map<int32_t, std::string> groupNames_;
+
+private:
+	friend Task;
+
+	void removeFromGroupTasks(Task *task);
+
+	void taskDone(Task *task);
+	Signal<Task *> taskDone_;
+
+	std::unordered_map<Task *, std::unique_ptr<Task>> tasksHolder_;
+
+	std::map<int32_t, std::list<Task *>> groupTasks_;
+	std::unordered_set<Task *> pendingTasks_;
+	std::unordered_set<Task *> runningTasks_;
+};
+
+template<typename Category, std::enable_if_t<std::is_enum_v<Category>> * = nullptr>
+class CategorizedScheduler : public Scheduler
+{
+	static_assert(std::is_enum<Category>::value, "Category should be an enum");
+
+public:
+	CategorizedScheduler(const std::map<Category, std::string> &categoryNames)
+	{
+		for (auto &[group, name] : categoryNames)
+			Scheduler::groupNames_[(int32_t)group] = name;
+	}
+
+	void queueTask(Task *task, Category group)
+	{
+		Scheduler::queueTask(task, static_cast<int32_t>(group));
+	}
+
+	std::list<Task *> &groupTasks(Category group)
+	{
+		return Scheduler::groupTasks(static_cast<int32_t>(group));
+	}
+
+	void succeedPrevTaskByStep(Category group, size_t step, Task *task)
+	{
+		Scheduler::succeedPrevTaskByStep(static_cast<uint32_t>(group), step, task);
+	}
+};
+
+} /* namespace libcamera */
diff --git a/src/libcamera/meson.build b/src/libcamera/meson.build
index aa9ab0291..e790813d1 100644
--- a/src/libcamera/meson.build
+++ b/src/libcamera/meson.build
@@ -47,6 +47,7 @@ libcamera_internal_sources = files([
     'shared_mem_object.cpp',
     'source_paths.cpp',
     'sysfs.cpp',
+    'task_scheduler.cpp',
     'v4l2_device.cpp',
     'v4l2_pixelformat.cpp',
     'v4l2_subdevice.cpp',
diff --git a/src/libcamera/task_scheduler.cpp b/src/libcamera/task_scheduler.cpp
new file mode 100644
index 000000000..7cc471d89
--- /dev/null
+++ b/src/libcamera/task_scheduler.cpp
@@ -0,0 +1,301 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Copyright (C) 2024, Google Inc.
+ *
+ * task_scheduler.cpp - A task scheduler
+ */
+
+#include "libcamera/internal/task_scheduler.h"
+
+#include <libcamera/base/log.h>
+
+/**
+ * \internal
+ * \file task_scheduler.h
+ * \brief Task and (Categorized)Scheduler that run tasks based on dependencies
+ */
+
+namespace libcamera {
+
+LOG_DEFINE_CATEGORY(Task)
+
+/**
+ * \class Task
+ * \brief A set of operations to execute as an individual Task
+ *
+ * The Task class contains a set of operations to be executed. It may have
+ * dependencies on other tasks, and should only be executed by the
+ * Scheduler it belongs to when all of its dependency tasks are executed.
+ *
+ * Upon its end of execution, it should notify its Scheduler by invoking
+ * Scheduler::taskDone.
+ */
+
+/**
+ * \brief Construct a Task instance
+ * \param[in] scheduler The Scheduler this task belongs to
+ * \param[in] id The ID of this task, only being used in log
+ */
+Task::Task(Scheduler *scheduler, const std::string &id)
+	: scheduler_(scheduler), id_(id)
+{
+}
+
+/**
+ * \brief Notify Scheduler the task has completed
+ *
+ * It's only called when all operations in this task is done, and it notifies
+ * the Scheduler so that the dependencies of its succedents can be resolved.
+ *
+ * Some derived classes might override this function to do some cleanup at the
+ * end of the Task. However, notifying the Scheduler should still be done.
+ */
+void Task::notifyDone()
+{
+	scheduler_->invokeMethod(&Scheduler::taskDone, ConnectionTypeQueued, this);
+}
+
+/**
+ * \fn Task::run()
+ * \brief Run the operations defined by the derived class
+ *
+ * This function runs the operations that should be done, or wait for some
+ * callbacks / events, like ISP processing frames, or IPA returning metadata.
+ */
+
+/**
+ * \fn Task::id()
+ * \brief The ID of the Task
+ */
+
+/**
+ * \fn Task::isRunning()
+ * \brief The state if the task has already been running
+ *
+ * It's mostly used after the user calls Scheduler::groupTasks, and needs to
+ * decide if it should use or depend on a Task based on its state.
+ */
+
+/**
+ * \var Task::scheduler_
+ * \brief The Scheduler this Task belongs to
+ */
+
+/**
+ * \var Task::id_
+ * \brief The id of this Task
+ */
+
+size_t Task::removeDependency(Task *task)
+{
+	precedents_.remove(task);
+	return precedents_.size();
+}
+
+void Task::depend(Task *task)
+{
+	precedents_.emplace_back(task);
+	task->succedents_.emplace_back(this);
+}
+
+void Task::launch()
+{
+	ASSERT(precedents_.empty());
+
+	running_ = true;
+	launchTime_ = std::chrono::steady_clock::now();
+
+	auto *method = new BoundMethodMember{
+		this, scheduler_, &Task::run, ConnectionTypeQueued
+	};
+
+	method->activate();
+}
+
+/**
+ * \class Scheduler
+ * \brief Scheduler to run Tasks based on their dependencies
+ *
+ * This is the very basic implementation of Scheduler that runs Tasks when
+ * they don't have any dependency left. It doesn't have priorities among Tasks.
+ */
+
+/**
+ * \brief Adds a dependency from \a precedent to \a task
+ * \param[in] precedent The precedent Task that is a dependency
+ * \param[in] task The Task that needs to set a dependency
+ */
+void Scheduler::precede(Task *precedent, Task *task)
+{
+	ASSERT(task && precedent);
+	task->depend(precedent);
+}
+
+Scheduler::Scheduler() = default;
+
+/**
+ * \brief Launch all Tasks that don't have any dependency
+ *
+ * This is mostly called after some Tasks are queued into the Scheduler.
+ */
+void Scheduler::schedule()
+{
+	for (auto it = pendingTasks_.begin(); it != pendingTasks_.end();) {
+		auto *task = *it;
+		if (!task->precedents_.empty()) {
+			it++;
+			continue;
+		}
+
+		runningTasks_.emplace(task);
+		it = pendingTasks_.erase(it);
+
+		task->launch();
+	}
+}
+
+/**
+ * \brief Log all non-finished Tasks, grouped by the group ids
+ */
+void Scheduler::log()
+{
+	std::stringstream ss;
+	for (auto &[group, name] : groupNames_)
+		ss << name << "[" << groupTasks_[group].size() << "] ";
+
+	LOG(Task, Info) << ss.str();
+}
+
+/**
+ * \brief Queues \a task into the Scheduler, which takes the ownership of the
+ * Task
+ * \param[in] task The task being queued and will be executed
+ * \param[in] group The group ID that the Task belongs to
+ *
+ * Note that \a task is passed as a raw pointer, to make the developer create
+ * and pass Task pointers more easily without holding both raw pointers and
+ * unique pointers. Scheduler will then construct a std::unique_ptr to take the
+ * ownership.
+ *
+ * The task will then be held as a pending task. The user of Scheduler should
+ * call Scheduler::schedule() to make new tasks without dependencies start to
+ * be executed. A Task with dependencies will be executed when its last
+ * precedent Task is done.
+ */
+void Scheduler::queueTask(Task *task, int32_t group)
+{
+	/* \todo: Detect cyclic dependency */
+	tasksHolder_.emplace(task, std::unique_ptr<Task>(task));
+
+	pendingTasks_.emplace(task);
+	groupTasks_[group].emplace_back(task);
+}
+
+/**
+ * \brief Sets a dependency from the \a step th latest Task in group \a group
+ * to \a task
+ * \param[in] group The group ID
+ * \param[in] step The number of Tasks to trace back, 0 being the latest one
+ * \param[in] task The Task that needs to add a dependency
+ *
+ * If the number of Tasks in group \a group is less than or equals to \a step,
+ * such a dependent Task doesn't exist, and no dependency will be added.
+ */
+void Scheduler::succeedPrevTaskByStep(int32_t group, size_t step, Task *task)
+{
+	ASSERT(task);
+
+	auto &tasks = groupTasks_[group];
+	if (tasks.size() <= step)
+		return;
+
+	auto iter = tasks.rbegin();
+	for (size_t i = 0; i < step; i++)
+		iter++;
+
+	precede(*iter, task);
+}
+
+/**
+ * \brief Return a list of unfinished Tasks that belong to the group ID
+ * \param[in] group The group ID
+ */
+std::list<Task *> &Scheduler::groupTasks(int32_t group)
+{
+	return groupTasks_[group];
+}
+
+/**
+ * \var Scheduler::groupNames_
+ * \brief The map from group ID to group name
+ *
+ * Should be set by the Scheduler's derived class
+ */
+
+void Scheduler::removeFromGroupTasks(Task *task)
+{
+	for (auto &[group, tasks] : groupTasks_)
+		tasks.remove(task);
+}
+
+void Scheduler::taskDone(Task *task)
+{
+	/* Sample execution time of the task, from launch to notifyDone */
+	std::chrono::milliseconds milliseconds =
+		std::chrono::duration_cast<std::chrono::milliseconds>(
+			std::chrono::steady_clock::now() - task->launchTime_);
+
+	LOG(Task, Debug) << "Task " << task->id() << " executed in "
+			 << milliseconds.count() << "ms";
+
+	taskDone_.emit(task);
+
+	runningTasks_.erase(task);
+	removeFromGroupTasks(task);
+
+	for (auto *succedent : task->succedents_) {
+		if (0 == succedent->removeDependency(task)) {
+			runningTasks_.emplace(succedent);
+			pendingTasks_.erase(succedent);
+
+			succedent->launch();
+		}
+	}
+
+	tasksHolder_.erase(task);
+}
+
+/**
+ * \class CategorizedScheduler
+ * \brief A Scheduler that uses an enum Category as the Task's group ID
+ */
+
+/**
+ * \fn CategorizedScheduler::CategorizedScheduler(
+ *      const std::map<Category, std::string> &categoryNames)
+ * \brief Construct a CategorizedScheduler with category names
+ * \param[in] categoryNames The map from enum group IDs to group names
+ */
+
+/**
+ * \fn void CategorizedScheduler::queueTask(Task *task, Category group)
+ * \brief Call Scheduler::queueTask with Category enum as the group ID
+ * \param[in] task The task being queued and will be executed
+ * \param[in] group The group ID in enum Category that the Task belongs to
+ */
+
+/**
+ * \fn std::list<Task *> &CategorizedScheduler::groupTasks(Category group)
+ * \brief Call Scheduler::groupTasks with Category enum as the group ID
+ * \param[in] group The group ID in enum Category
+ */
+
+/**
+ * \fn void CategorizedScheduler::succeedPrevTaskByStep(Category group, size_t step, Task *task)
+ * \brief Call Scheduler::succeedPrevTaskByStep with Category enum as the group ID
+ * \param[in] group The group ID in enum Category
+ * \param[in] step The number of Tasks to trace back, 0 being the latest one
+ * \param[in] task The Task that needs to add a dependency
+ */
+
+} /* namespace libcamera */
-- 
2.46.0.295.g3b9ea8a38a-goog



More information about the libcamera-devel mailing list