Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update lock-free queue for multi-threading, add remove task process in the wait method of Task. #436

Merged
merged 8 commits into from
Jan 21, 2025

Conversation

kevingpqi123
Copy link
Collaborator

No description provided.

/**
* Returns true if the Task is currently waiting to execute its code block.
*/
bool waiting();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不需要这个接口

@@ -67,10 +73,10 @@ class Task {
void wait();

private:
enum class TaskStatus { waiting, executing, finished, cancelled };
Copy link
Collaborator

@domchen domchen Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不能叫waiting,有歧义。用queuing.

LockFreeQueue() {
queuePool = reinterpret_cast<T*>(std::calloc(QUEUE_SIZE, sizeof(T)));
/**
* capacity needs to be a power of 2, otherwise the implementation of getIndex needs to be changed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The capacity

* capacity needs to be a power of 2, otherwise the implementation of getIndex needs to be changed.
* @param capacity
*/
explicit LockFreeQueue(uint32_t capacity) : _capacity(capacity) {
Copy link
Collaborator

@domchen domchen Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里要给capacity加个ASSERT()检查是不是2的指数,或者自动吸附到指数。

Comment on lines 37 to 51
bool Task::waiting() {
return status.load(std::memory_order_relaxed) == TaskStatus::waiting;
}

bool Task::executing() {
std::lock_guard<std::mutex> autoLock(locker);
return _executing;
return status.load(std::memory_order_relaxed) == TaskStatus::executing;
}

bool Task::cancelled() {
std::lock_guard<std::mutex> autoLock(locker);
return _cancelled;
return status.load(std::memory_order_relaxed) == TaskStatus::cancelled;
}

bool Task::finished() {
std::lock_guard<std::mutex> autoLock(locker);
return !_executing && !_cancelled;
return status.load(std::memory_order_relaxed) == TaskStatus::finished;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这一排的状态变量可以统一为一个status属性对外暴露。不用单独一个个声明了。

return;
auto currentStatus = status.load(std::memory_order_relaxed);
if (currentStatus == TaskStatus::waiting) {
status.store(TaskStatus::cancelled, std::memory_order_relaxed);
Copy link
Collaborator

@domchen domchen Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

现在没有锁保护了,这里也需要有个CAS,不然不是线程安全的。不能直接就赋值。

if (oldStatus == TaskStatus::waiting) {
if (status.compare_exchange_weak(oldStatus, TaskStatus::executing, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
block();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这句之前加一行注释:If wait() is called from the thread pool, all threads might block, leaving no thread to execute this task. To avoid deadlock, execute the task directly on the current thread if it's queued.

* Indicates the current status of the Task. To ensure thread safety, read the data using the
* following method: taskStatus.load(std::memory_order_relaxed).
*/
std::atomic<TaskStatus> taskStatus = TaskStatus::queuing;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加个只读方法,不能直接这么暴露出来。

return;
auto currentStatus = taskStatus.load(std::memory_order_relaxed);
if (currentStatus == TaskStatus::queuing) {
while (!taskStatus.compare_exchange_weak(currentStatus, TaskStatus::cancelled,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里不能while,如果不是queuing状态,就是不能cancel了。

* power of 2 larger than the capacity.
* @param capacity
*/
explicit LockFreeQueue(uint32_t capacity) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个Capacity还有一种写法,可以声明到template的参数里, template <typename T, size_t N>, 这样就可以直接声明了。

/**
* Return the current status of the Task.
*/
TaskStatus taskStatus() const;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

就叫status吧

@domchen domchen merged commit ce19888 into main Jan 21, 2025
8 checks passed
@domchen domchen deleted the feature/kevingpqi_threads branch January 21, 2025 06:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants