Skip to content

Commit

Permalink
add coro mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
microcai committed Jan 7, 2025
1 parent 26911ba commit 7a4fc8b
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 3 deletions.
225 changes: 222 additions & 3 deletions include/ucoro/inter_coro.hpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@


// inter coro communication tools

// aka. channels

#pragma once

#include <deque>
#include <mutex>
#include "awaitable.hpp"

#if __has_include(<asio.hpp>)
#include <asio.hpp>
#define __HAS_ASIO_HPP
#endif

#if __has_include(<boost/asio.hpp>)
#include <boost/asio.hpp>
#define __HAS_BOOST_ASIO_HPP
#endif

namespace ucoro::communication
{


/*
* channel<>, 协程版的异步列队。但是线程不安全。
* 使用此channel的多个协程需要被同一个线程调度。
* 否则会导致数据损坏
*/
template<typename T>
class channel
{
Expand Down Expand Up @@ -47,7 +62,7 @@ class channel

constexpr void await_resume() noexcept {}

void await_suspend(std::coroutine_handle<> handle)
void await_suspend(std::coroutine_handle<> handle) noexcept
{
wait_on.push_back(handle);
}
Expand Down Expand Up @@ -90,6 +105,210 @@ class channel
std::deque<std::coroutine_handle<>> m_push_awaiting;
};

template<typename T>
concept has_post_member = requires (T t)
{
t.post([](){});
};

template<typename T>
concept is_io_context = requires (T t)
{
t.get_executor().execute([](){});
};


// 协程版的 mutex. 使用场景为多线程调度多协程的情况下进行数据保护。
// 虽然并不十分的建议将协程调度在多线程环境,但是总有特殊情况需要使用。
// 注:单线程调度协程并不等同于只能使用一个线程开发应用。而是使用一个线程
// 一个 io_context 的形式。这样在同一个 io_context 下运行的代码,就无需
// 锁保护。也就是说,本 mutex 的使用场景是多线程跑一个 io_context,然后
// 使用协程,也就是 M:N 调度下进行数据保护。
class mutex
{
mutex(const mutex&) = delete;
mutex(mutex&&) = delete;

struct base_executor
{
virtual void post(std::coroutine_handle<> handle) = 0;
virtual ~base_executor(){};
};

struct dummy_executor : public base_executor
{
virtual void post(std::coroutine_handle<> handle) override
{
return handle.resume();
}
};

template<typename Executor>
struct post_executor : public base_executor
{
Executor& executor;
virtual void post(std::coroutine_handle<> handle) override
{
executor.post(handle);
}

post_executor(Executor& ex)
: executor(ex)
{}
};

template<typename Executor>
struct std_executor : public base_executor
{
Executor executor;
virtual void post(std::coroutine_handle<> handle) override
{
executor.execute(handle);
}

std_executor(Executor&& ex)
: executor(ex)
{}
};

#ifdef __HAS_ASIO_HPP
struct asio_io_context : public base_executor
{
asio::io_context& executor;
virtual void post(std::coroutine_handle<> handle) override
{
asio::post(executor, handle);
}

asio_io_context(asio::io_context& io) : executor(io) {}
};
#endif

#ifdef __HAS_BOOST_ASIO_HPP
struct boost_asio_io_context : public base_executor
{
boost::asio::io_context& executor;
virtual void post(std::coroutine_handle<> handle) override
{
boost::asio::post(executor, handle);
}
boost_asio_io_context(boost::asio::io_context& io) : executor(io) {}
};
#endif

struct any_executor
{
std::unique_ptr<base_executor> impl_;
void post(std::coroutine_handle<> handle)
{
impl_->post(handle);
}

any_executor()
{
impl_.reset(new dummy_executor);
}

#ifdef __HAS_ASIO_HPP
any_executor(asio::io_context& ex)
{
impl_.reset(new asio_io_context{ex});
}
#endif
#ifdef __HAS_BOOST_ASIO_HPP
any_executor(boost::asio::io_context& ex)
{
impl_.reset(new boost_asio_io_context{ex});
}
#endif
template<typename Ex> requires (has_post_member<Ex>)
any_executor(Ex& ex)
{
impl_.reset(new post_executor{ex});
}

template<typename Ex> requires (is_io_context<Ex>)
any_executor(Ex& ex)
{
impl_.reset(new std_executor{ex.get_executor()});
}
};

struct lock_waiter
{
any_executor& executor;
std::coroutine_handle<> coro_handle;

void operator()()
{
executor.post(coro_handle);
}
};

struct await_locker
{
mutex* parent;
any_executor& executor;
constexpr bool await_ready() noexcept { return false; }

constexpr void await_resume() noexcept {}

void await_suspend(std::coroutine_handle<> corohandle) noexcept
{
parent->m_awaiter.emplace_back(executor, corohandle);
parent->m_thread_lock.unlock();
}
};

public:
mutex()
{}

awaitable<void> lock(any_executor executor)
{
if (is_locked.test_and_set())
{
m_thread_lock.lock();
if (is_locked.test_and_set())
{
// lock 失败。等待释放.
co_await await_locker{this, executor};
// 释放成功。同时已被本协程上锁.
}
else
{
// 第二次尝试后锁成功.
m_thread_lock.unlock();
}
}

// lock 成功。返回.
co_return;
}

void unlock()
{
m_thread_lock.lock();
// 要释放锁,先检查是否有等待者.
if (m_awaiter.empty())
{
is_locked.clear();
m_thread_lock.unlock();
return;
}
// 有等待者,唤醒其中之一.
auto top_waiter = m_awaiter.front();
m_awaiter.pop_front();
m_thread_lock.unlock();

return top_waiter();
}
private:
std::mutex m_thread_lock;
std::atomic_flag is_locked;
std::deque<lock_waiter> m_awaiter;
};


}

Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ find_package(Boost 1.60 COMPONENTS thread system atomic)
if(Boost_FOUND)
add_subdirectory(test_asio)
add_subdirectory(test5)
add_subdirectory(test9)
endif(Boost_FOUND)

add_subdirectory(test3)
Expand Down
6 changes: 6 additions & 0 deletions tests/test9/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

add_executable(test9 test.cpp)
target_link_libraries(test9 ucoro Boost::boost Boost::thread Boost::system)

add_test(NAME test9 COMMAND test9)
set_target_properties(test9 PROPERTIES FOLDER "ucoro_tests")
48 changes: 48 additions & 0 deletions tests/test9/test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@

#include "ucoro/awaitable.hpp"
#include "ucoro/inter_coro.hpp"
#include <iostream>
#include <boost/asio.hpp>

ucoro::communication::mutex mtx;
boost::asio::io_context io;

ucoro::awaitable<void> coroA()
{
boost::asio::executor_work_guard<decltype(io.get_executor())> work{io.get_executor()};
for (int i =0; i < 200; i++)
{
std::cout << "A locking...\n";
co_await mtx.lock(io);
std::cout << "coroA locked\n";
std::this_thread::sleep_for(std::chrono::microseconds(30));
mtx.unlock();
std::cout << "coroA unlocked\n";
}
}

ucoro::awaitable<void> coroB()
{
boost::asio::executor_work_guard<decltype(io.get_executor())> work{io.get_executor()};
for (int i =0; i < 200; i++)
{
std::cout << "B locking...\n";
co_await mtx.lock(io);
std::cout << "coroB locked\n";
std::this_thread::sleep_for(std::chrono::microseconds(130));
mtx.unlock();
std::cout << "coroB unlocked\n";
}
}

int main(int argc, char** argv)
{
boost::asio::post(io, [](){coroA().detach(&io);});
boost::asio::post(io, [](){coroB().detach(&io);});

std::thread(&boost::asio::io_context::run, &io).detach();
std::thread(&boost::asio::io_context::run, &io).detach();

io.run();
return 0;
}

0 comments on commit 7a4fc8b

Please sign in to comment.