Skip to content

Commit

Permalink
Use lock-free queue in multi-threads.
Browse files Browse the repository at this point in the history
  • Loading branch information
kevingpqi123 committed Jan 10, 2025
1 parent f8b6f3a commit 3463fa6
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 94 deletions.
12 changes: 8 additions & 4 deletions include/tgfx/core/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@

#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>

namespace tgfx {
enum class TaskState {
Queued,
Executing,
Finished,
Canceled,
};

class TaskGroup;

/**
Expand Down Expand Up @@ -69,12 +75,10 @@ class Task {
private:
std::mutex locker = {};
std::condition_variable condition = {};
bool _executing = true;
bool _cancelled = false;
std::atomic<TaskState> state = TaskState::Queued;
std::function<void()> block = nullptr;

explicit Task(std::function<void()> block);
bool removeTask();
void execute();

friend class TaskGroup;
Expand Down
110 changes: 110 additions & 0 deletions src/core/utils/LockFreeQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/////////////////////////////////////////////////////////////////////////////////////////////////
//
// Tencent is pleased to support the open source community by making libpag available.
//
// Copyright (C) 2025 THL A29 Limited, a Tencent company. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// unless required by applicable law or agreed to in writing, software distributed under the
// license is distributed on an "as is" basis, without warranties or conditions of any kind,
// either express or implied. see the license for the specific language governing permissions
// and limitations under the license.
//
/////////////////////////////////////////////////////////////////////////////////////////////////

#pragma once
#include <algorithm>
#include <atomic>
#include <cstdint>

namespace tgfx {
template <typename T>
class LockFreeQueue {
public:
using ElementType = T;
LockFreeQueue() : _size(0) {
header = tail = deleteTail = new QueueNode;
}
LockFreeQueue& operator=(const LockFreeQueue& other) = delete;
LockFreeQueue(const LockFreeQueue& other) = delete;

~LockFreeQueue() {
QueueNode* pTail = deleteTail;
while (pTail != nullptr) {
QueueNode* pNode = pTail;
pTail = pNode->nextNode;
delete pNode;
}
}

ElementType dequeue() {
QueueNode* tailNode = tail;
ElementType element = ElementType();
do {
if (tailNode->nextNode == nullptr) return element;
} while (!std::atomic_compare_exchange_weak(&tail, &tailNode, tailNode->nextNode));
QueueNode* popNode = tailNode->nextNode;
element = std::move(popNode->item);
popNode->item = ElementType();
if (deleteTail->nextNode && deleteTail->nextNode->nextNode) {
QueueNode* deleteNode = deleteTail;
deleteTail = deleteTail->nextNode;
delete deleteNode;
deleteNode = nullptr;
}
if (_size > 0) {
_size--;
}
return element;
}

bool enqueue(const ElementType& element) {
QueueNode* newNode = new (std::nothrow) QueueNode(element);
if (newNode == nullptr) {
return false;
}
QueueNode* currentHeader = header;
while (!std::atomic_compare_exchange_weak(&header, &currentHeader, newNode))
;
std::atomic_thread_fence(std::memory_order_seq_cst);
currentHeader->nextNode = newNode;
_size++;
return true;
}

bool empty() const {
return _size == 0;
}

void clear() {
while (dequeue() != nullptr)
;
}

uint64_t size() const {
return _size;
}

private:
struct QueueNode {
QueueNode() : nextNode(nullptr) {
}
explicit QueueNode(const ElementType& element) : nextNode(nullptr), item(element) {
}
explicit QueueNode(ElementType& element) : nextNode(nullptr), item(std::move(element)) {
}
QueueNode* volatile nextNode;
ElementType item;
};

std::atomic<QueueNode*> header = nullptr;
std::atomic<QueueNode*> tail = nullptr;
QueueNode* deleteTail = nullptr;
std::atomic_uint64_t _size = 0;
};

} // namespace tgfx
34 changes: 11 additions & 23 deletions src/core/utils/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
/////////////////////////////////////////////////////////////////////////////////////////////////

#include "tgfx/core/Task.h"
#include "core/utils/Log.h"
#include "core/utils/TaskGroup.h"

namespace tgfx {
Expand All @@ -36,55 +35,44 @@ Task::Task(std::function<void()> block) : block(std::move(block)) {
}

bool Task::executing() {
std::lock_guard<std::mutex> autoLock(locker);
return _executing;
return state == TaskState::Executing;
}

bool Task::cancelled() {
std::lock_guard<std::mutex> autoLock(locker);
return _cancelled;
return state == TaskState::Canceled;
}

bool Task::finished() {
std::lock_guard<std::mutex> autoLock(locker);
return !_executing && !_cancelled;
return state == TaskState::Finished;
}

void Task::wait() {
std::unique_lock<std::mutex> autoLock(locker);
if (!_executing) {
if (state == TaskState::Finished || state == TaskState::Canceled) {
return;
}
// Try to remove the task from the queue. Execute it directly on the current thread if the task is
// not in the queue. This is to avoid the deadlock situation.
if (removeTask()) {
if (state == TaskState::Queued) {
state = TaskState::Executing;
block();
_executing = false;
state = TaskState::Finished;
condition.notify_all();
return;
}
condition.wait(autoLock);
}

void Task::cancel() {
std::unique_lock<std::mutex> autoLock(locker);
if (!_executing) {
return;
}
if (removeTask()) {
_executing = false;
_cancelled = true;
if (state == TaskState::Queued) {
state = TaskState::Canceled;
}
}

bool Task::removeTask() {
return TaskGroup::GetInstance()->removeTask(this);
}

void Task::execute() {
state = TaskState::Executing;
block();
std::lock_guard<std::mutex> auoLock(locker);
_executing = false;
state = TaskState::Finished;
condition.notify_all();
}
} // namespace tgfx
86 changes: 25 additions & 61 deletions src/core/utils/TaskGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

#include "TaskGroup.h"
#include <algorithm>
#include <atomic>
#include <cstdio>
#include <cstdlib>
#include <string>
#include "core/utils/Log.h"
Expand All @@ -30,7 +28,7 @@

namespace tgfx {
static constexpr auto THREAD_TIMEOUT = std::chrono::seconds(10);
static constexpr uint32_t InvalidThreadNumber = 0;
static constexpr int MAX_TASK_COUNT = 100;

int GetCPUCores() {
int cpuCores = 0;
Expand All @@ -47,20 +45,8 @@ int GetCPUCores() {
return cpuCores;
}

uint32_t GetThreadNumber() {
static std::atomic<uint32_t> nextID{1};
uint32_t number;
do {
number = nextID.fetch_add(1, std::memory_order_relaxed);
} while (number == InvalidThreadNumber);
return number;
}

std::string GetThreadName() {
char threadName[10] = {'\0'};
snprintf(threadName, 10, "Thread_%d", GetThreadNumber());
return threadName;
}
static const int CPUCores = GetCPUCores();
static const int MaxThreads = CPUCores > 16 ? 16 : CPUCores;

TaskGroup* TaskGroup::GetInstance() {
static auto& taskGroup = *new TaskGroup();
Expand All @@ -70,10 +56,12 @@ TaskGroup* TaskGroup::GetInstance() {
void TaskGroup::RunLoop(TaskGroup* taskGroup) {
while (true) {
auto task = taskGroup->popTask();
if (!task) {
break;
if (task == nullptr) {
continue;
}
if (task->state == TaskState::Queued) {
task->execute();
}
task->execute();
}
}

Expand All @@ -95,90 +83,66 @@ TaskGroup::TaskGroup() {
}

bool TaskGroup::checkThreads() {
static const int CPUCores = GetCPUCores();
static const int MaxThreads = CPUCores > 16 ? 16 : CPUCores;
while (!timeoutThreads.empty()) {
auto threadID = timeoutThreads.back();
timeoutThreads.pop_back();
auto result = std::find_if(threads.begin(), threads.end(),
[=](std::thread* thread) { return thread->get_id() == threadID; });
if (result != threads.end()) {
ReleaseThread(*result);
threads.erase(result);
}
}
auto totalThreads = static_cast<int>(threads.size());
if (activeThreads < totalThreads || totalThreads >= MaxThreads) {
return true;
}
auto thread = new (std::nothrow) std::thread(&TaskGroup::RunLoop, this);
if (thread) {
activeThreads++;
threads.push_back(thread);
threads.enqueue(thread);
// LOGI("TaskGroup: A task thread is created, the current number of threads : %lld",
// threads.size());
}
return !threads.empty();
}

bool TaskGroup::pushTask(std::shared_ptr<Task> task) {
std::lock_guard<std::mutex> autoLock(locker);
#if defined(TGFX_BUILD_FOR_WEB) && !defined(__EMSCRIPTEN_PTHREADS__)
return false;
#endif
if (exited || !checkThreads()) {
return false;
}
tasks.push_back(std::move(task));
condition.notify_one();
if (tasks.size() >= MAX_TASK_COUNT) {
return false;
}
tasks.enqueue(std::move(task));
if (waitDataCount > 0) {
condition.notify_one();
}
return true;
}

std::shared_ptr<Task> TaskGroup::popTask() {
std::unique_lock<std::mutex> autoLock(locker);
activeThreads--;
while (!exited) {
if (tasks.empty()) {
waitDataCount++;
auto status = condition.wait_for(autoLock, THREAD_TIMEOUT);
waitDataCount--;
if (exited || status == std::cv_status::timeout) {
auto threadID = std::this_thread::get_id();
timeoutThreads.push_back(threadID);
// LOGI("TaskGroup: A task thread is exited, the current number of threads : %lld",
// threads.size() - expiredThreads.size());
return nullptr;
}
} else {
auto task = tasks.front();
tasks.pop_front();
activeThreads++;
// LOGI("TaskGroup: A task is running, the current active threads : %lld",
// activeThreads);
std::shared_ptr<Task> task = tasks.dequeue();
while (task && task->state != TaskState::Queued) {
task = tasks.dequeue();
}
return task;
}
}
return nullptr;
}

bool TaskGroup::removeTask(Task* target) {
std::lock_guard<std::mutex> autoLock(locker);
auto position = std::find_if(tasks.begin(), tasks.end(),
[=](std::shared_ptr<Task> task) { return task.get() == target; });
if (position == tasks.end()) {
return false;
}
tasks.erase(position);
return true;
}

void TaskGroup::exit() {
locker.lock();
exited = true;
tasks.clear();
condition.notify_all();
locker.unlock();
for (auto& thread : threads) {
auto thread = threads.dequeue();
while (thread != nullptr) {
ReleaseThread(thread);
thread = threads.dequeue();
}
threads.clear();
}
} // namespace tgfx
Loading

0 comments on commit 3463fa6

Please sign in to comment.