Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update lock-free queue for multi-threading, add remove task process in the wait method of Task. #436

Merged
merged 8 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions include/tgfx/core/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#pragma once

#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
Expand All @@ -26,6 +27,8 @@
namespace tgfx {
class TaskGroup;

enum class TaskStatus { queuing, executing, finished, cancelled };

/**
* The Task class manages the concurrent execution of one or more code blocks.
*/
Expand All @@ -38,21 +41,6 @@ class Task {
*/
static std::shared_ptr<Task> Run(std::function<void()> block);

/**
* Returns true if the Task is currently executing its code block.
*/
bool executing();

/**
* Returns true if the Task has been cancelled
*/
bool cancelled();

/**
* Returns true if the Task has finished executing its code block.
*/
bool finished();

/**
* Advises the Task that it should stop executing its code block. Cancellation does not affect the
* execution of a Task that has already begun.
Expand All @@ -66,11 +54,15 @@ class Task {
*/
void wait();

/**
* Indicates the current status of the Task. To ensure thread safety, read the data using the
* following method: taskStatus.load(std::memory_order_relaxed).
*/
std::atomic<TaskStatus> taskStatus = TaskStatus::queuing;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加个只读方法,不能直接这么暴露出来。


private:
std::mutex locker = {};
std::condition_variable condition = {};
bool _executing = true;
bool _cancelled = false;
std::function<void()> block = nullptr;

explicit Task(std::function<void()> block);
Expand Down
49 changes: 27 additions & 22 deletions src/core/utils/LockFreeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,27 @@

namespace tgfx {
static constexpr size_t CACHELINE_SIZE = 64;
// QUEUE_SIZE needs to be a power of 2, otherwise the implementation of GetIndex needs to be changed.
static constexpr uint32_t QUEUE_SIZE = 1024;

inline uint32_t GetIndex(uint32_t position) {
return position & (QUEUE_SIZE - 1);
}

template <typename T>
class LockFreeQueue {
public:
LockFreeQueue() {
queuePool = reinterpret_cast<T*>(std::calloc(QUEUE_SIZE, sizeof(T)));
/**
* The capacity needs to be a power of 2, otherwise, it will be automatically set to the nearest
* power of 2 larger than the capacity.
* @param capacity
*/
explicit LockFreeQueue(uint32_t capacity) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个Capacity还有一种写法,可以声明到template的参数里, template <typename T, size_t N>, 这样就可以直接声明了。

if ((capacity & (capacity - 1)) != 0) {
_capacity = 1;
while (_capacity < capacity) {
_capacity <<= 1;
}
} else {
_capacity = capacity;
}
queuePool = reinterpret_cast<T*>(std::calloc(_capacity, sizeof(T)));
if (queuePool == nullptr) {
LOGE("LockFreeQueue init Failed!\n");
return;
ABORT("LockFreeQueue init Failed!\n");
}
}

Expand All @@ -59,9 +65,6 @@ class LockFreeQueue {
}

T dequeue() {
if (queuePool == nullptr) {
return nullptr;
}
uint32_t newHead = 0;
uint32_t oldHead = head.load(std::memory_order_relaxed);
T element = nullptr;
Expand All @@ -71,11 +74,11 @@ class LockFreeQueue {
if (newHead == tailPosition.load(std::memory_order_acquire)) {
return nullptr;
}
element = queuePool[GetIndex(newHead)];
element = queuePool[getIndex(newHead)];
} while (!head.compare_exchange_weak(oldHead, newHead, std::memory_order_acq_rel,
std::memory_order_relaxed));

queuePool[GetIndex(newHead)] = nullptr;
queuePool[getIndex(newHead)] = nullptr;

uint32_t newHeadPosition = 0;
uint32_t oldHeadPosition = headPosition.load(std::memory_order_relaxed);
Expand All @@ -87,22 +90,19 @@ class LockFreeQueue {
}

bool enqueue(const T& element) {
if (queuePool == nullptr) {
return false;
}
uint32_t newTail = 0;
uint32_t oldTail = tail.load(std::memory_order_relaxed);

do {
newTail = oldTail + 1;
if (GetIndex(newTail) == GetIndex(headPosition.load(std::memory_order_acquire))) {
LOGI("The queue has reached its maximum capacity, capacity: %u!\n", QUEUE_SIZE);
if (getIndex(oldTail) == getIndex(headPosition.load(std::memory_order_acquire))) {
LOGI("The queue has reached its maximum capacity, capacity: %u!\n", _capacity);
return false;
}
newTail = oldTail + 1;
} while (!tail.compare_exchange_weak(oldTail, newTail, std::memory_order_acq_rel,
std::memory_order_relaxed));

queuePool[GetIndex(oldTail)] = std::move(element);
queuePool[getIndex(oldTail)] = std::move(element);

uint32_t newTailPosition = 0;
uint32_t oldTailPosition = tailPosition.load(std::memory_order_relaxed);
Expand All @@ -115,6 +115,7 @@ class LockFreeQueue {

private:
T* queuePool = nullptr;
uint32_t _capacity = 0;
#ifdef DISABLE_ALIGNAS
// head indicates the position after requesting space.
std::atomic<uint32_t> head = {0};
Expand All @@ -134,6 +135,10 @@ class LockFreeQueue {
// tailPosition indicates the position after filling data.
alignas(CACHELINE_SIZE) std::atomic<uint32_t> tailPosition = {1};
#endif

uint32_t getIndex(uint32_t position) {
return position & (_capacity - 1);
}
};

} // namespace tgfx
63 changes: 36 additions & 27 deletions src/core/utils/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,42 +34,51 @@ std::shared_ptr<Task> Task::Run(std::function<void()> block) {
Task::Task(std::function<void()> block) : block(std::move(block)) {
}

bool Task::executing() {
std::lock_guard<std::mutex> autoLock(locker);
return _executing;
}

bool Task::cancelled() {
std::lock_guard<std::mutex> autoLock(locker);
return _cancelled;
}

bool Task::finished() {
std::lock_guard<std::mutex> autoLock(locker);
return !_executing && !_cancelled;
}

void Task::wait() {
std::unique_lock<std::mutex> autoLock(locker);
if (!_executing) {
auto oldStatus = taskStatus.load(std::memory_order_relaxed);
if (oldStatus == TaskStatus::cancelled || oldStatus == TaskStatus::finished) {
return;
}
condition.wait(autoLock);
// If wait() is called from the thread pool, all threads might block, leaving no thread to execute
// this task. To avoid deadlock, execute the task directly on the current thread if it's queued.
if (oldStatus == TaskStatus::queuing) {
if (taskStatus.compare_exchange_weak(oldStatus, TaskStatus::executing,
std::memory_order_acq_rel, std::memory_order_relaxed)) {
block();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这句之前加一行注释:If wait() is called from the thread pool, all threads might block, leaving no thread to execute this task. To avoid deadlock, execute the task directly on the current thread if it's queued.

oldStatus = TaskStatus::executing;
while (!taskStatus.compare_exchange_weak(
oldStatus, TaskStatus::finished, std::memory_order_acq_rel, std::memory_order_relaxed))
;
return;
}
}
std::unique_lock<std::mutex> autoLock(locker);
if (taskStatus.load(std::memory_order_acquire) == TaskStatus::executing) {
condition.wait(autoLock);
}
}

void Task::cancel() {
std::unique_lock<std::mutex> autoLock(locker);
if (!_executing) {
return;
auto currentStatus = taskStatus.load(std::memory_order_relaxed);
if (currentStatus == TaskStatus::queuing) {
while (!taskStatus.compare_exchange_weak(currentStatus, TaskStatus::cancelled,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里不能while,如果不是queuing状态,就是不能cancel了。

std::memory_order_acq_rel, std::memory_order_relaxed))
;
}
_executing = false;
_cancelled = true;
}

void Task::execute() {
block();
std::lock_guard<std::mutex> auoLock(locker);
_executing = false;
condition.notify_all();
auto oldStatus = taskStatus.load(std::memory_order_relaxed);
if (oldStatus == TaskStatus::queuing &&
taskStatus.compare_exchange_weak(oldStatus, TaskStatus::executing, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
block();
oldStatus = TaskStatus::executing;
while (!taskStatus.compare_exchange_weak(oldStatus, TaskStatus::finished,
std::memory_order_acq_rel, std::memory_order_relaxed))
;
std::unique_lock<std::mutex> autoLock(locker);
condition.notify_all();
}
}
} // namespace tgfx
26 changes: 14 additions & 12 deletions src/core/utils/TaskGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

namespace tgfx {
static constexpr auto THREAD_TIMEOUT = std::chrono::seconds(10);
static constexpr uint32_t THREAD_POOL_SIZE = 32;
static constexpr uint32_t TASK_QUEUE_SIZE = 1024;

int GetCPUCores() {
int cpuCores = 0;
Expand All @@ -44,9 +46,6 @@ int GetCPUCores() {
return cpuCores;
}

static const int CPUCores = GetCPUCores();
static const int MaxThreads = CPUCores > 16 ? 16 : CPUCores;

TaskGroup* TaskGroup::GetInstance() {
static auto& taskGroup = *new TaskGroup();
return &taskGroup;
Expand Down Expand Up @@ -76,15 +75,18 @@ void OnAppExit() {
}

TaskGroup::TaskGroup() {
threads = new LockFreeQueue<std::thread*>(THREAD_POOL_SIZE);
tasks = new LockFreeQueue<std::shared_ptr<Task>>(TASK_QUEUE_SIZE);
std::atexit(OnAppExit);
threads.resize(static_cast<size_t>(MaxThreads), nullptr);
}

bool TaskGroup::checkThreads() {
static const int CPUCores = GetCPUCores();
static const int MaxThreads = CPUCores > 16 ? 16 : CPUCores;
if (waitingThreads == 0 && totalThreads < MaxThreads) {
auto thread = new (std::nothrow) std::thread(&TaskGroup::RunLoop, this);
if (thread) {
threads[static_cast<size_t>(totalThreads)] = thread;
threads->enqueue(thread);
totalThreads++;
}
} else {
Expand All @@ -100,7 +102,7 @@ bool TaskGroup::pushTask(std::shared_ptr<Task> task) {
if (exited || !checkThreads()) {
return false;
}
if (!tasks.enqueue(std::move(task))) {
if (!tasks->enqueue(std::move(task))) {
return false;
}
if (waitingThreads > 0) {
Expand All @@ -112,7 +114,7 @@ bool TaskGroup::pushTask(std::shared_ptr<Task> task) {
std::shared_ptr<Task> TaskGroup::popTask() {
std::unique_lock<std::mutex> autoLock(locker);
while (!exited) {
auto task = tasks.dequeue();
auto task = tasks->dequeue();
if (task) {
return task;
}
Expand All @@ -129,12 +131,12 @@ std::shared_ptr<Task> TaskGroup::popTask() {
void TaskGroup::exit() {
exited = true;
condition.notify_all();
for (int i = 0; i < totalThreads; i++) {
auto thread = threads[static_cast<size_t>(i)];
if (thread) {
ReleaseThread(thread);
}
std::thread* thread = nullptr;
while ((thread = threads->dequeue()) != nullptr) {
ReleaseThread(thread);
}
delete threads;
delete tasks;
totalThreads = 0;
waitingThreads = 0;
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/utils/TaskGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class TaskGroup {
std::atomic_int totalThreads = 0;
std::atomic_bool exited = false;
std::atomic_int waitingThreads = 0;
LockFreeQueue<std::shared_ptr<Task>> tasks = {};
std::vector<std::thread*> threads = {};
LockFreeQueue<std::shared_ptr<Task>>* tasks = nullptr;
LockFreeQueue<std::thread*>* threads = nullptr;
static TaskGroup* GetInstance();
static void RunLoop(TaskGroup* taskGroup);

Expand Down
Loading