From 836aed9428f1c1bf0b0f0e5d74ec9e4ed4cdb208 Mon Sep 17 00:00:00 2001 From: kevingpqi123 Date: Thu, 16 Jan 2025 14:13:23 +0800 Subject: [PATCH] Use lock-free queue in multi-threads. (#428) --- include/tgfx/core/Task.h | 1 - src/core/utils/LockFreeQueue.h | 139 +++++++++++++++++++++++++++++++++ src/core/utils/Task.cpp | 19 +---- src/core/utils/TaskGroup.cpp | 111 ++++++++------------------ src/core/utils/TaskGroup.h | 11 ++- 5 files changed, 180 insertions(+), 101 deletions(-) create mode 100644 src/core/utils/LockFreeQueue.h diff --git a/include/tgfx/core/Task.h b/include/tgfx/core/Task.h index be84f6ec..05e6c86b 100644 --- a/include/tgfx/core/Task.h +++ b/include/tgfx/core/Task.h @@ -74,7 +74,6 @@ class Task { std::function block = nullptr; explicit Task(std::function block); - bool removeTask(); void execute(); friend class TaskGroup; diff --git a/src/core/utils/LockFreeQueue.h b/src/core/utils/LockFreeQueue.h new file mode 100644 index 00000000..4232ddd7 --- /dev/null +++ b/src/core/utils/LockFreeQueue.h @@ -0,0 +1,139 @@ +///////////////////////////////////////////////////////////////////////////////////////////////// +// +// Tencent is pleased to support the open source community by making tgfx available. +// +// Copyright (C) 2025 THL A29 Limited, a Tencent company. All rights reserved. +// +// Licensed under the BSD 3-Clause License (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// https://opensource.org/licenses/BSD-3-Clause +// +// unless required by applicable law or agreed to in writing, software distributed under the +// license is distributed on an "as is" basis, without warranties or conditions of any kind, +// either express or implied. see the license for the specific language governing permissions +// and limitations under the license. +// +///////////////////////////////////////////////////////////////////////////////////////////////// + +#pragma once +#include +#include +#include +#include +#include +#include "Log.h" + +#if __APPLE__ +#include +#if defined(__IPHONE_OS_VERSION_MIN_REQUIRED) && __IPHONE_OS_VERSION_MIN_REQUIRED < 110000 +#define DISABLE_ALIGNAS 1 +#endif +#endif + +namespace tgfx { +static constexpr size_t CACHELINE_SIZE = 64; +// QUEUE_SIZE needs to be a power of 2, otherwise the implementation of GetIndex needs to be changed. +static constexpr uint32_t QUEUE_SIZE = 1024; + +inline uint32_t GetIndex(uint32_t position) { + return position & (QUEUE_SIZE - 1); +} + +template +class LockFreeQueue { + public: + LockFreeQueue() { + queuePool = reinterpret_cast(std::calloc(QUEUE_SIZE, sizeof(T))); + if (queuePool == nullptr) { + LOGE("LockFreeQueue init Failed!\n"); + return; + } + } + + ~LockFreeQueue() { + if (queuePool) { + std::free(queuePool); + queuePool = nullptr; + } + } + + T dequeue() { + if (queuePool == nullptr) { + return nullptr; + } + uint32_t newHead = 0; + uint32_t oldHead = head.load(std::memory_order_relaxed); + T element = nullptr; + + do { + newHead = oldHead + 1; + if (newHead == tailPosition.load(std::memory_order_acquire)) { + return nullptr; + } + element = queuePool[GetIndex(newHead)]; + } while (!head.compare_exchange_weak(oldHead, newHead, std::memory_order_acq_rel, + std::memory_order_relaxed)); + + queuePool[GetIndex(newHead)] = nullptr; + + uint32_t newHeadPosition = 0; + uint32_t oldHeadPosition = headPosition.load(std::memory_order_relaxed); + do { + newHeadPosition = oldHeadPosition + 1; + } while (!headPosition.compare_exchange_weak( + oldHeadPosition, newHeadPosition, std::memory_order_acq_rel, std::memory_order_relaxed)); + return element; + } + + bool enqueue(const T& element) { + if (queuePool == nullptr) { + return false; + } + uint32_t newTail = 0; + uint32_t oldTail = tail.load(std::memory_order_relaxed); + + do { + newTail = oldTail + 1; + if (GetIndex(newTail) == GetIndex(headPosition.load(std::memory_order_acquire))) { + LOGI("The queue has reached its maximum capacity, capacity: %u!\n", QUEUE_SIZE); + return false; + } + } while (!tail.compare_exchange_weak(oldTail, newTail, std::memory_order_acq_rel, + std::memory_order_relaxed)); + + queuePool[GetIndex(oldTail)] = std::move(element); + + uint32_t newTailPosition = 0; + uint32_t oldTailPosition = tailPosition.load(std::memory_order_relaxed); + do { + newTailPosition = oldTailPosition + 1; + } while (!tailPosition.compare_exchange_weak( + oldTailPosition, newTailPosition, std::memory_order_acq_rel, std::memory_order_relaxed)); + return true; + } + + private: + T* queuePool = nullptr; +#ifdef DISABLE_ALIGNAS + // head indicates the position after requesting space. + std::atomic head = {0}; + // headPosition indicates the position after release data. + std::atomic headPosition = {0}; + // tail indicates the position after requesting space. + std::atomic tail = {1}; + // tailPosition indicates the position after filling data. + std::atomic tailPosition = {1}; +#else + // head indicates the position after requesting space. + alignas(CACHELINE_SIZE) std::atomic head = {0}; + // headPosition indicates the position after release data. + alignas(CACHELINE_SIZE) std::atomic headPosition = {0}; + // tail indicates the position after requesting space. + alignas(CACHELINE_SIZE) std::atomic tail = {1}; + // tailPosition indicates the position after filling data. + alignas(CACHELINE_SIZE) std::atomic tailPosition = {1}; +#endif +}; + +} // namespace tgfx diff --git a/src/core/utils/Task.cpp b/src/core/utils/Task.cpp index 3fbf7bfe..ec406860 100644 --- a/src/core/utils/Task.cpp +++ b/src/core/utils/Task.cpp @@ -17,7 +17,6 @@ ///////////////////////////////////////////////////////////////////////////////////////////////// #include "tgfx/core/Task.h" -#include "core/utils/Log.h" #include "core/utils/TaskGroup.h" namespace tgfx { @@ -55,14 +54,6 @@ void Task::wait() { if (!_executing) { return; } - // Try to remove the task from the queue. Execute it directly on the current thread if the task is - // not in the queue. This is to avoid the deadlock situation. - if (removeTask()) { - block(); - _executing = false; - condition.notify_all(); - return; - } condition.wait(autoLock); } @@ -71,14 +62,8 @@ void Task::cancel() { if (!_executing) { return; } - if (removeTask()) { - _executing = false; - _cancelled = true; - } -} - -bool Task::removeTask() { - return TaskGroup::GetInstance()->removeTask(this); + _executing = false; + _cancelled = true; } void Task::execute() { diff --git a/src/core/utils/TaskGroup.cpp b/src/core/utils/TaskGroup.cpp index 380dbc5b..f1edfa54 100644 --- a/src/core/utils/TaskGroup.cpp +++ b/src/core/utils/TaskGroup.cpp @@ -18,8 +18,6 @@ #include "TaskGroup.h" #include -#include -#include #include #include #include "core/utils/Log.h" @@ -30,7 +28,6 @@ namespace tgfx { static constexpr auto THREAD_TIMEOUT = std::chrono::seconds(10); -static constexpr uint32_t InvalidThreadNumber = 0; int GetCPUCores() { int cpuCores = 0; @@ -47,20 +44,8 @@ int GetCPUCores() { return cpuCores; } -uint32_t GetThreadNumber() { - static std::atomic nextID{1}; - uint32_t number; - do { - number = nextID.fetch_add(1, std::memory_order_relaxed); - } while (number == InvalidThreadNumber); - return number; -} - -std::string GetThreadName() { - char threadName[10] = {'\0'}; - snprintf(threadName, 10, "Thread_%d", GetThreadNumber()); - return threadName; -} +static const int CPUCores = GetCPUCores(); +static const int MaxThreads = CPUCores > 16 ? 16 : CPUCores; TaskGroup* TaskGroup::GetInstance() { static auto& taskGroup = *new TaskGroup(); @@ -68,10 +53,10 @@ TaskGroup* TaskGroup::GetInstance() { } void TaskGroup::RunLoop(TaskGroup* taskGroup) { - while (true) { + while (!taskGroup->exited) { auto task = taskGroup->popTask(); - if (!task) { - break; + if (task == nullptr) { + continue; } task->execute(); } @@ -92,93 +77,65 @@ void OnAppExit() { TaskGroup::TaskGroup() { std::atexit(OnAppExit); + threads.resize(static_cast(MaxThreads), nullptr); } bool TaskGroup::checkThreads() { - static const int CPUCores = GetCPUCores(); - static const int MaxThreads = CPUCores > 16 ? 16 : CPUCores; - while (!timeoutThreads.empty()) { - auto threadID = timeoutThreads.back(); - timeoutThreads.pop_back(); - auto result = std::find_if(threads.begin(), threads.end(), - [=](std::thread* thread) { return thread->get_id() == threadID; }); - if (result != threads.end()) { - ReleaseThread(*result); - threads.erase(result); + if (waitingThreads == 0 && totalThreads < MaxThreads) { + auto thread = new (std::nothrow) std::thread(&TaskGroup::RunLoop, this); + if (thread) { + threads[static_cast(totalThreads)] = thread; + totalThreads++; } - } - auto totalThreads = static_cast(threads.size()); - if (activeThreads < totalThreads || totalThreads >= MaxThreads) { + } else { return true; } - auto thread = new (std::nothrow) std::thread(&TaskGroup::RunLoop, this); - if (thread) { - activeThreads++; - threads.push_back(thread); - // LOGI("TaskGroup: A task thread is created, the current number of threads : %lld", - // threads.size()); - } - return !threads.empty(); + return totalThreads > 0; } bool TaskGroup::pushTask(std::shared_ptr task) { - std::lock_guard autoLock(locker); #if defined(TGFX_BUILD_FOR_WEB) && !defined(__EMSCRIPTEN_PTHREADS__) return false; #endif if (exited || !checkThreads()) { return false; } - tasks.push_back(std::move(task)); - condition.notify_one(); + if (!tasks.enqueue(std::move(task))) { + return false; + } + if (waitingThreads > 0) { + condition.notify_one(); + } return true; } std::shared_ptr TaskGroup::popTask() { std::unique_lock autoLock(locker); - activeThreads--; while (!exited) { - if (tasks.empty()) { - auto status = condition.wait_for(autoLock, THREAD_TIMEOUT); - if (exited || status == std::cv_status::timeout) { - auto threadID = std::this_thread::get_id(); - timeoutThreads.push_back(threadID); - // LOGI("TaskGroup: A task thread is exited, the current number of threads : %lld", - // threads.size() - expiredThreads.size()); - return nullptr; - } - } else { - auto task = tasks.front(); - tasks.pop_front(); - activeThreads++; - // LOGI("TaskGroup: A task is running, the current active threads : %lld", - // activeThreads); + auto task = tasks.dequeue(); + if (task) { return task; } + waitingThreads++; + auto status = condition.wait_for(autoLock, THREAD_TIMEOUT); + waitingThreads--; + if (exited || status == std::cv_status::timeout) { + return nullptr; + } } return nullptr; } -bool TaskGroup::removeTask(Task* target) { - std::lock_guard autoLock(locker); - auto position = std::find_if(tasks.begin(), tasks.end(), - [=](std::shared_ptr task) { return task.get() == target; }); - if (position == tasks.end()) { - return false; - } - tasks.erase(position); - return true; -} - void TaskGroup::exit() { - locker.lock(); exited = true; - tasks.clear(); condition.notify_all(); - locker.unlock(); - for (auto& thread : threads) { - ReleaseThread(thread); + for (int i = 0; i < totalThreads; i++) { + auto thread = threads[static_cast(i)]; + if (thread) { + ReleaseThread(thread); + } } - threads.clear(); + totalThreads = 0; + waitingThreads = 0; } } // namespace tgfx diff --git a/src/core/utils/TaskGroup.h b/src/core/utils/TaskGroup.h index 34307907..6d977ea4 100644 --- a/src/core/utils/TaskGroup.h +++ b/src/core/utils/TaskGroup.h @@ -23,6 +23,7 @@ #include #include #include +#include "LockFreeQueue.h" #include "tgfx/core/Task.h" namespace tgfx { @@ -30,12 +31,11 @@ class TaskGroup { private: std::mutex locker = {}; std::condition_variable condition = {}; - int activeThreads = 0; - bool exited = false; - std::list> tasks = {}; + std::atomic_int totalThreads = 0; + std::atomic_bool exited = false; + std::atomic_int waitingThreads = 0; + LockFreeQueue> tasks = {}; std::vector threads = {}; - std::vector timeoutThreads = {}; - static TaskGroup* GetInstance(); static void RunLoop(TaskGroup* taskGroup); @@ -43,7 +43,6 @@ class TaskGroup { bool checkThreads(); bool pushTask(std::shared_ptr task); std::shared_ptr popTask(); - bool removeTask(Task* task); void exit(); friend class Task;