Skip to content

Commit

Permalink
Use lock-free queue in multi-threads. (#428)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevingpqi123 authored Jan 16, 2025
1 parent e4d0550 commit 836aed9
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 101 deletions.
1 change: 0 additions & 1 deletion include/tgfx/core/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class Task {
std::function<void()> block = nullptr;

explicit Task(std::function<void()> block);
bool removeTask();
void execute();

friend class TaskGroup;
Expand Down
139 changes: 139 additions & 0 deletions src/core/utils/LockFreeQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/////////////////////////////////////////////////////////////////////////////////////////////////
//
// Tencent is pleased to support the open source community by making tgfx available.
//
// Copyright (C) 2025 THL A29 Limited, a Tencent company. All rights reserved.
//
// Licensed under the BSD 3-Clause License (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// https://opensource.org/licenses/BSD-3-Clause
//
// unless required by applicable law or agreed to in writing, software distributed under the
// license is distributed on an "as is" basis, without warranties or conditions of any kind,
// either express or implied. see the license for the specific language governing permissions
// and limitations under the license.
//
/////////////////////////////////////////////////////////////////////////////////////////////////

#pragma once
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include "Log.h"

#if __APPLE__
#include <TargetConditionals.h>
#if defined(__IPHONE_OS_VERSION_MIN_REQUIRED) && __IPHONE_OS_VERSION_MIN_REQUIRED < 110000
#define DISABLE_ALIGNAS 1
#endif
#endif

namespace tgfx {
static constexpr size_t CACHELINE_SIZE = 64;
// QUEUE_SIZE needs to be a power of 2, otherwise the implementation of GetIndex needs to be changed.
static constexpr uint32_t QUEUE_SIZE = 1024;

inline uint32_t GetIndex(uint32_t position) {
return position & (QUEUE_SIZE - 1);
}

template <typename T>
class LockFreeQueue {
public:
LockFreeQueue() {
queuePool = reinterpret_cast<T*>(std::calloc(QUEUE_SIZE, sizeof(T)));
if (queuePool == nullptr) {
LOGE("LockFreeQueue init Failed!\n");
return;
}
}

~LockFreeQueue() {
if (queuePool) {
std::free(queuePool);
queuePool = nullptr;
}
}

T dequeue() {
if (queuePool == nullptr) {
return nullptr;
}
uint32_t newHead = 0;
uint32_t oldHead = head.load(std::memory_order_relaxed);
T element = nullptr;

do {
newHead = oldHead + 1;
if (newHead == tailPosition.load(std::memory_order_acquire)) {
return nullptr;
}
element = queuePool[GetIndex(newHead)];
} while (!head.compare_exchange_weak(oldHead, newHead, std::memory_order_acq_rel,
std::memory_order_relaxed));

queuePool[GetIndex(newHead)] = nullptr;

uint32_t newHeadPosition = 0;
uint32_t oldHeadPosition = headPosition.load(std::memory_order_relaxed);
do {
newHeadPosition = oldHeadPosition + 1;
} while (!headPosition.compare_exchange_weak(
oldHeadPosition, newHeadPosition, std::memory_order_acq_rel, std::memory_order_relaxed));
return element;
}

bool enqueue(const T& element) {
if (queuePool == nullptr) {
return false;
}
uint32_t newTail = 0;
uint32_t oldTail = tail.load(std::memory_order_relaxed);

do {
newTail = oldTail + 1;
if (GetIndex(newTail) == GetIndex(headPosition.load(std::memory_order_acquire))) {
LOGI("The queue has reached its maximum capacity, capacity: %u!\n", QUEUE_SIZE);
return false;
}
} while (!tail.compare_exchange_weak(oldTail, newTail, std::memory_order_acq_rel,
std::memory_order_relaxed));

queuePool[GetIndex(oldTail)] = std::move(element);

uint32_t newTailPosition = 0;
uint32_t oldTailPosition = tailPosition.load(std::memory_order_relaxed);
do {
newTailPosition = oldTailPosition + 1;
} while (!tailPosition.compare_exchange_weak(
oldTailPosition, newTailPosition, std::memory_order_acq_rel, std::memory_order_relaxed));
return true;
}

private:
T* queuePool = nullptr;
#ifdef DISABLE_ALIGNAS
// head indicates the position after requesting space.
std::atomic<uint32_t> head = {0};
// headPosition indicates the position after release data.
std::atomic<uint32_t> headPosition = {0};
// tail indicates the position after requesting space.
std::atomic<uint32_t> tail = {1};
// tailPosition indicates the position after filling data.
std::atomic<uint32_t> tailPosition = {1};
#else
// head indicates the position after requesting space.
alignas(CACHELINE_SIZE) std::atomic<uint32_t> head = {0};
// headPosition indicates the position after release data.
alignas(CACHELINE_SIZE) std::atomic<uint32_t> headPosition = {0};
// tail indicates the position after requesting space.
alignas(CACHELINE_SIZE) std::atomic<uint32_t> tail = {1};
// tailPosition indicates the position after filling data.
alignas(CACHELINE_SIZE) std::atomic<uint32_t> tailPosition = {1};
#endif
};

} // namespace tgfx
19 changes: 2 additions & 17 deletions src/core/utils/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
/////////////////////////////////////////////////////////////////////////////////////////////////

#include "tgfx/core/Task.h"
#include "core/utils/Log.h"
#include "core/utils/TaskGroup.h"

namespace tgfx {
Expand Down Expand Up @@ -55,14 +54,6 @@ void Task::wait() {
if (!_executing) {
return;
}
// Try to remove the task from the queue. Execute it directly on the current thread if the task is
// not in the queue. This is to avoid the deadlock situation.
if (removeTask()) {
block();
_executing = false;
condition.notify_all();
return;
}
condition.wait(autoLock);
}

Expand All @@ -71,14 +62,8 @@ void Task::cancel() {
if (!_executing) {
return;
}
if (removeTask()) {
_executing = false;
_cancelled = true;
}
}

bool Task::removeTask() {
return TaskGroup::GetInstance()->removeTask(this);
_executing = false;
_cancelled = true;
}

void Task::execute() {
Expand Down
111 changes: 34 additions & 77 deletions src/core/utils/TaskGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

#include "TaskGroup.h"
#include <algorithm>
#include <atomic>
#include <cstdio>
#include <cstdlib>
#include <string>
#include "core/utils/Log.h"
Expand All @@ -30,7 +28,6 @@

namespace tgfx {
static constexpr auto THREAD_TIMEOUT = std::chrono::seconds(10);
static constexpr uint32_t InvalidThreadNumber = 0;

int GetCPUCores() {
int cpuCores = 0;
Expand All @@ -47,31 +44,19 @@ int GetCPUCores() {
return cpuCores;
}

uint32_t GetThreadNumber() {
static std::atomic<uint32_t> nextID{1};
uint32_t number;
do {
number = nextID.fetch_add(1, std::memory_order_relaxed);
} while (number == InvalidThreadNumber);
return number;
}

std::string GetThreadName() {
char threadName[10] = {'\0'};
snprintf(threadName, 10, "Thread_%d", GetThreadNumber());
return threadName;
}
static const int CPUCores = GetCPUCores();
static const int MaxThreads = CPUCores > 16 ? 16 : CPUCores;

TaskGroup* TaskGroup::GetInstance() {
static auto& taskGroup = *new TaskGroup();
return &taskGroup;
}

void TaskGroup::RunLoop(TaskGroup* taskGroup) {
while (true) {
while (!taskGroup->exited) {
auto task = taskGroup->popTask();
if (!task) {
break;
if (task == nullptr) {
continue;
}
task->execute();
}
Expand All @@ -92,93 +77,65 @@ void OnAppExit() {

TaskGroup::TaskGroup() {
std::atexit(OnAppExit);
threads.resize(static_cast<size_t>(MaxThreads), nullptr);
}

bool TaskGroup::checkThreads() {
static const int CPUCores = GetCPUCores();
static const int MaxThreads = CPUCores > 16 ? 16 : CPUCores;
while (!timeoutThreads.empty()) {
auto threadID = timeoutThreads.back();
timeoutThreads.pop_back();
auto result = std::find_if(threads.begin(), threads.end(),
[=](std::thread* thread) { return thread->get_id() == threadID; });
if (result != threads.end()) {
ReleaseThread(*result);
threads.erase(result);
if (waitingThreads == 0 && totalThreads < MaxThreads) {
auto thread = new (std::nothrow) std::thread(&TaskGroup::RunLoop, this);
if (thread) {
threads[static_cast<size_t>(totalThreads)] = thread;
totalThreads++;
}
}
auto totalThreads = static_cast<int>(threads.size());
if (activeThreads < totalThreads || totalThreads >= MaxThreads) {
} else {
return true;
}
auto thread = new (std::nothrow) std::thread(&TaskGroup::RunLoop, this);
if (thread) {
activeThreads++;
threads.push_back(thread);
// LOGI("TaskGroup: A task thread is created, the current number of threads : %lld",
// threads.size());
}
return !threads.empty();
return totalThreads > 0;
}

bool TaskGroup::pushTask(std::shared_ptr<Task> task) {
std::lock_guard<std::mutex> autoLock(locker);
#if defined(TGFX_BUILD_FOR_WEB) && !defined(__EMSCRIPTEN_PTHREADS__)
return false;
#endif
if (exited || !checkThreads()) {
return false;
}
tasks.push_back(std::move(task));
condition.notify_one();
if (!tasks.enqueue(std::move(task))) {
return false;
}
if (waitingThreads > 0) {
condition.notify_one();
}
return true;
}

std::shared_ptr<Task> TaskGroup::popTask() {
std::unique_lock<std::mutex> autoLock(locker);
activeThreads--;
while (!exited) {
if (tasks.empty()) {
auto status = condition.wait_for(autoLock, THREAD_TIMEOUT);
if (exited || status == std::cv_status::timeout) {
auto threadID = std::this_thread::get_id();
timeoutThreads.push_back(threadID);
// LOGI("TaskGroup: A task thread is exited, the current number of threads : %lld",
// threads.size() - expiredThreads.size());
return nullptr;
}
} else {
auto task = tasks.front();
tasks.pop_front();
activeThreads++;
// LOGI("TaskGroup: A task is running, the current active threads : %lld",
// activeThreads);
auto task = tasks.dequeue();
if (task) {
return task;
}
waitingThreads++;
auto status = condition.wait_for(autoLock, THREAD_TIMEOUT);
waitingThreads--;
if (exited || status == std::cv_status::timeout) {
return nullptr;
}
}
return nullptr;
}

bool TaskGroup::removeTask(Task* target) {
std::lock_guard<std::mutex> autoLock(locker);
auto position = std::find_if(tasks.begin(), tasks.end(),
[=](std::shared_ptr<Task> task) { return task.get() == target; });
if (position == tasks.end()) {
return false;
}
tasks.erase(position);
return true;
}

void TaskGroup::exit() {
locker.lock();
exited = true;
tasks.clear();
condition.notify_all();
locker.unlock();
for (auto& thread : threads) {
ReleaseThread(thread);
for (int i = 0; i < totalThreads; i++) {
auto thread = threads[static_cast<size_t>(i)];
if (thread) {
ReleaseThread(thread);
}
}
threads.clear();
totalThreads = 0;
waitingThreads = 0;
}
} // namespace tgfx
11 changes: 5 additions & 6 deletions src/core/utils/TaskGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,26 @@
#include <mutex>
#include <thread>
#include <vector>
#include "LockFreeQueue.h"
#include "tgfx/core/Task.h"

namespace tgfx {
class TaskGroup {
private:
std::mutex locker = {};
std::condition_variable condition = {};
int activeThreads = 0;
bool exited = false;
std::list<std::shared_ptr<Task>> tasks = {};
std::atomic_int totalThreads = 0;
std::atomic_bool exited = false;
std::atomic_int waitingThreads = 0;
LockFreeQueue<std::shared_ptr<Task>> tasks = {};
std::vector<std::thread*> threads = {};
std::vector<std::thread::id> timeoutThreads = {};

static TaskGroup* GetInstance();
static void RunLoop(TaskGroup* taskGroup);

TaskGroup();
bool checkThreads();
bool pushTask(std::shared_ptr<Task> task);
std::shared_ptr<Task> popTask();
bool removeTask(Task* task);
void exit();

friend class Task;
Expand Down

0 comments on commit 836aed9

Please sign in to comment.