Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update lock-free queue for multi-threading, add remove task process in the wait method of Task. #436

Merged
merged 8 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions include/tgfx/core/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#pragma once

#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
Expand All @@ -38,6 +39,11 @@ class Task {
*/
static std::shared_ptr<Task> Run(std::function<void()> block);

/**
* Returns true if the Task is currently waiting to execute its code block.
*/
bool waiting();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不需要这个接口


/**
* Returns true if the Task is currently executing its code block.
*/
Expand Down Expand Up @@ -67,10 +73,10 @@ class Task {
void wait();

private:
enum class TaskStatus { waiting, executing, finished, cancelled };
Copy link
Collaborator

@domchen domchen Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不能叫waiting,有歧义。用queuing.

std::mutex locker = {};
std::condition_variable condition = {};
bool _executing = true;
bool _cancelled = false;
std::atomic<TaskStatus> status = TaskStatus::waiting;
std::function<void()> block = nullptr;

explicit Task(std::function<void()> block);
Expand Down
40 changes: 18 additions & 22 deletions src/core/utils/LockFreeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,18 @@

namespace tgfx {
static constexpr size_t CACHELINE_SIZE = 64;
// QUEUE_SIZE needs to be a power of 2, otherwise the implementation of GetIndex needs to be changed.
static constexpr uint32_t QUEUE_SIZE = 1024;

inline uint32_t GetIndex(uint32_t position) {
return position & (QUEUE_SIZE - 1);
}

template <typename T>
class LockFreeQueue {
public:
LockFreeQueue() {
queuePool = reinterpret_cast<T*>(std::calloc(QUEUE_SIZE, sizeof(T)));
/**
* capacity needs to be a power of 2, otherwise the implementation of getIndex needs to be changed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The capacity

* @param capacity
*/
explicit LockFreeQueue(uint32_t capacity) : _capacity(capacity) {
Copy link
Collaborator

@domchen domchen Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里要给capacity加个ASSERT()检查是不是2的指数,或者自动吸附到指数。

queuePool = reinterpret_cast<T*>(std::calloc(capacity, sizeof(T)));
if (queuePool == nullptr) {
LOGE("LockFreeQueue init Failed!\n");
return;
ABORT("LockFreeQueue init Failed!\n");
}
}

Expand All @@ -59,9 +56,6 @@ class LockFreeQueue {
}

T dequeue() {
if (queuePool == nullptr) {
return nullptr;
}
uint32_t newHead = 0;
uint32_t oldHead = head.load(std::memory_order_relaxed);
T element = nullptr;
Expand All @@ -71,11 +65,11 @@ class LockFreeQueue {
if (newHead == tailPosition.load(std::memory_order_acquire)) {
return nullptr;
}
element = queuePool[GetIndex(newHead)];
element = queuePool[getIndex(newHead)];
} while (!head.compare_exchange_weak(oldHead, newHead, std::memory_order_acq_rel,
std::memory_order_relaxed));

queuePool[GetIndex(newHead)] = nullptr;
queuePool[getIndex(newHead)] = nullptr;

uint32_t newHeadPosition = 0;
uint32_t oldHeadPosition = headPosition.load(std::memory_order_relaxed);
Expand All @@ -87,22 +81,19 @@ class LockFreeQueue {
}

bool enqueue(const T& element) {
if (queuePool == nullptr) {
return false;
}
uint32_t newTail = 0;
uint32_t oldTail = tail.load(std::memory_order_relaxed);

do {
newTail = oldTail + 1;
if (GetIndex(newTail) == GetIndex(headPosition.load(std::memory_order_acquire))) {
LOGI("The queue has reached its maximum capacity, capacity: %u!\n", QUEUE_SIZE);
if (getIndex(oldTail) == getIndex(headPosition.load(std::memory_order_acquire))) {
LOGI("The queue has reached its maximum capacity, capacity: %u!\n", _capacity);
return false;
}
newTail = oldTail + 1;
} while (!tail.compare_exchange_weak(oldTail, newTail, std::memory_order_acq_rel,
std::memory_order_relaxed));

queuePool[GetIndex(oldTail)] = std::move(element);
queuePool[getIndex(oldTail)] = std::move(element);

uint32_t newTailPosition = 0;
uint32_t oldTailPosition = tailPosition.load(std::memory_order_relaxed);
Expand All @@ -115,6 +106,7 @@ class LockFreeQueue {

private:
T* queuePool = nullptr;
uint32_t _capacity = 0;
#ifdef DISABLE_ALIGNAS
// head indicates the position after requesting space.
std::atomic<uint32_t> head = {0};
Expand All @@ -134,6 +126,10 @@ class LockFreeQueue {
// tailPosition indicates the position after filling data.
alignas(CACHELINE_SIZE) std::atomic<uint32_t> tailPosition = {1};
#endif

uint32_t getIndex(uint32_t position) {
return position & (_capacity - 1);
}
};

} // namespace tgfx
51 changes: 33 additions & 18 deletions src/core/utils/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,42 +34,57 @@ std::shared_ptr<Task> Task::Run(std::function<void()> block) {
Task::Task(std::function<void()> block) : block(std::move(block)) {
}

bool Task::waiting() {
return status.load(std::memory_order_relaxed) == TaskStatus::waiting;
}

bool Task::executing() {
std::lock_guard<std::mutex> autoLock(locker);
return _executing;
return status.load(std::memory_order_relaxed) == TaskStatus::executing;
}

bool Task::cancelled() {
std::lock_guard<std::mutex> autoLock(locker);
return _cancelled;
return status.load(std::memory_order_relaxed) == TaskStatus::cancelled;
}

bool Task::finished() {
std::lock_guard<std::mutex> autoLock(locker);
return !_executing && !_cancelled;
return status.load(std::memory_order_relaxed) == TaskStatus::finished;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这一排的状态变量可以统一为一个status属性对外暴露。不用单独一个个声明了。


void Task::wait() {
std::unique_lock<std::mutex> autoLock(locker);
if (!_executing) {
auto oldStatus = status.load(std::memory_order_relaxed);
if (oldStatus == TaskStatus::cancelled || oldStatus == TaskStatus::finished) {
return;
}
condition.wait(autoLock);
if (oldStatus == TaskStatus::waiting) {
if (status.compare_exchange_weak(oldStatus, TaskStatus::executing, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
block();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这句之前加一行注释:If wait() is called from the thread pool, all threads might block, leaving no thread to execute this task. To avoid deadlock, execute the task directly on the current thread if it's queued.

status.store(TaskStatus::finished, std::memory_order_relaxed);
return;
}
}
std::unique_lock<std::mutex> autoLock(locker);
if (status.load(std::memory_order_acquire) == TaskStatus::executing) {
condition.wait(autoLock);
}
}

void Task::cancel() {
std::unique_lock<std::mutex> autoLock(locker);
if (!_executing) {
return;
auto currentStatus = status.load(std::memory_order_relaxed);
if (currentStatus == TaskStatus::waiting) {
status.store(TaskStatus::cancelled, std::memory_order_relaxed);
Copy link
Collaborator

@domchen domchen Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

现在没有锁保护了,这里也需要有个CAS,不然不是线程安全的。不能直接就赋值。

}
_executing = false;
_cancelled = true;
}

void Task::execute() {
block();
std::lock_guard<std::mutex> auoLock(locker);
_executing = false;
condition.notify_all();
auto oldStatus = status.load(std::memory_order_relaxed);
if (oldStatus == TaskStatus::waiting &&
status.compare_exchange_weak(oldStatus, TaskStatus::executing, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
block();
status.store(TaskStatus::finished, std::memory_order_relaxed);
std::unique_lock<std::mutex> autoLock(locker);
condition.notify_all();
}
}
} // namespace tgfx
26 changes: 14 additions & 12 deletions src/core/utils/TaskGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

namespace tgfx {
static constexpr auto THREAD_TIMEOUT = std::chrono::seconds(10);
static constexpr uint32_t THREAD_POOL_SIZE = 32;
static constexpr uint32_t TASK_QUEUE_SIZE = 1024;

int GetCPUCores() {
int cpuCores = 0;
Expand All @@ -44,9 +46,6 @@ int GetCPUCores() {
return cpuCores;
}

static const int CPUCores = GetCPUCores();
static const int MaxThreads = CPUCores > 16 ? 16 : CPUCores;

TaskGroup* TaskGroup::GetInstance() {
static auto& taskGroup = *new TaskGroup();
return &taskGroup;
Expand Down Expand Up @@ -76,15 +75,18 @@ void OnAppExit() {
}

TaskGroup::TaskGroup() {
threads = new LockFreeQueue<std::thread*>(THREAD_POOL_SIZE);
tasks = new LockFreeQueue<std::shared_ptr<Task>>(TASK_QUEUE_SIZE);
std::atexit(OnAppExit);
threads.resize(static_cast<size_t>(MaxThreads), nullptr);
}

bool TaskGroup::checkThreads() {
static const int CPUCores = GetCPUCores();
static const int MaxThreads = CPUCores > 16 ? 16 : CPUCores;
if (waitingThreads == 0 && totalThreads < MaxThreads) {
auto thread = new (std::nothrow) std::thread(&TaskGroup::RunLoop, this);
if (thread) {
threads[static_cast<size_t>(totalThreads)] = thread;
threads->enqueue(thread);
totalThreads++;
}
} else {
Expand All @@ -100,7 +102,7 @@ bool TaskGroup::pushTask(std::shared_ptr<Task> task) {
if (exited || !checkThreads()) {
return false;
}
if (!tasks.enqueue(std::move(task))) {
if (!tasks->enqueue(std::move(task))) {
return false;
}
if (waitingThreads > 0) {
Expand All @@ -112,7 +114,7 @@ bool TaskGroup::pushTask(std::shared_ptr<Task> task) {
std::shared_ptr<Task> TaskGroup::popTask() {
std::unique_lock<std::mutex> autoLock(locker);
while (!exited) {
auto task = tasks.dequeue();
auto task = tasks->dequeue();
if (task) {
return task;
}
Expand All @@ -129,12 +131,12 @@ std::shared_ptr<Task> TaskGroup::popTask() {
void TaskGroup::exit() {
exited = true;
condition.notify_all();
for (int i = 0; i < totalThreads; i++) {
auto thread = threads[static_cast<size_t>(i)];
if (thread) {
ReleaseThread(thread);
}
std::thread* thread = nullptr;
while ((thread = threads->dequeue()) != nullptr) {
ReleaseThread(thread);
}
delete threads;
delete tasks;
totalThreads = 0;
waitingThreads = 0;
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/utils/TaskGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class TaskGroup {
std::atomic_int totalThreads = 0;
std::atomic_bool exited = false;
std::atomic_int waitingThreads = 0;
LockFreeQueue<std::shared_ptr<Task>> tasks = {};
std::vector<std::thread*> threads = {};
LockFreeQueue<std::shared_ptr<Task>>* tasks = nullptr;
LockFreeQueue<std::thread*>* threads = nullptr;
static TaskGroup* GetInstance();
static void RunLoop(TaskGroup* taskGroup);

Expand Down
Loading