Skip to content

Commit

Permalink
fixes bug
Browse files Browse the repository at this point in the history
  • Loading branch information
actboy168 committed May 6, 2024
1 parent 923779e commit 297c0ae
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 41 deletions.
2 changes: 1 addition & 1 deletion bee/net/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ namespace bee::net {
case socket::recv_status::wait:
case socket::recv_status::close:
case socket::recv_status::failed:
e.clear(std::memory_order_seq_cst);
return;
case socket::recv_status::success:
break;
default:
std::unreachable();
}
}
e.clear(std::memory_order_seq_cst);
}

fd_t event::fd() const noexcept {
Expand Down
54 changes: 34 additions & 20 deletions binding/lua_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ extern "C" {
namespace bee::lua_channel {
class channel {
public:
using box = std::shared_ptr<channel>;
using value_type = void*;
using box = std::shared_ptr<channel>;

bool init() noexcept {
if (!ev.open()) {
Expand All @@ -29,23 +28,42 @@ namespace bee::lua_channel {
net::fd_t fd() const noexcept {
return ev.fd();
}
void push(value_type data) noexcept {
void push(lua_State* L, int from) noexcept {
void* data = seri_pack(L, from, NULL);
std::unique_lock<spinlock> lk(mutex);
queue.push(data);
ev.set();
}
bool pop(value_type& data) noexcept {
int pop(lua_State* L) noexcept {
void* data;
{
std::unique_lock<spinlock> lk(mutex);
if (queue.empty()) {
ev.clear();
lua_pushboolean(L, 0);
return 1;
}
data = queue.front();
queue.pop();
}
lua_pushboolean(L, 1);
return 1 + seri_unpackptr(L, data);
}
void clear() noexcept {
std::unique_lock<spinlock> lk(mutex);
if (queue.empty()) {
return false;
for (;;) {
if (queue.empty()) {
ev.clear();
return;
}
void* data = queue.front();
free(data);
queue.pop();
}
data = queue.front();
queue.pop();
return true;
}

private:
std::queue<value_type> queue;
std::queue<void*> queue;
spinlock mutex;
net::event ev;
};
Expand All @@ -67,6 +85,9 @@ namespace bee::lua_channel {
}
void clear() noexcept {
std::unique_lock<spinlock> lk(mutex);
for (const auto& [_, c] : channels) {
c->clear();
}
channels.clear();
}
channel::box query(zstring_view name) noexcept {
Expand All @@ -87,21 +108,14 @@ namespace bee::lua_channel {
static channelmgr g_channel;

static int lchannel_push(lua_State* L) {
auto& bc = lua::checkudata<channel::box>(L, 1);
void* buffer = seri_pack(L, 1, NULL);
bc->push(buffer);
auto& bc = lua::checkudata<channel::box>(L, 1);
bc->push(L, 1);
return 0;
}

static int lchannel_pop(lua_State* L) {
auto& bc = lua::checkudata<channel::box>(L, 1);
void* data;
if (!bc->pop(data)) {
lua_pushboolean(L, 0);
return 1;
}
lua_pushboolean(L, 1);
return 1 + seri_unpackptr(L, data);
return bc->pop(L);
}

static int lchannel_fd(lua_State* L) {
Expand Down
50 changes: 30 additions & 20 deletions test/test_channel.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ end

local test_channel = lt.test "channel"

function test_channel:test_channel_create()
function test_channel:test_create()
channel.reset()
lt.assertErrorMsgEquals("Can't query channel 'test'", channel.query, "test")
channel.create "test"
Expand Down Expand Up @@ -166,21 +166,28 @@ function test_channel:test_fd()
epfd:event_add(req:fd(), epoll.EPOLLIN)
local function dispatch(ok, what, ...)
if not ok then
return true
return 1
end
if what == "exit" then
os.exit()
return
return 0
end
res:push(what, ...)
end
for _, event in epfd:wait() do
if event & (epoll.EPOLLERR | epoll.EPOLLHUP) ~= 0 then
assert(false, "unknown error")
return
end
if event & epoll.EPOLLIN ~= 0 then
while not dispatch(req:pop()) do
while true do
for _, event in epfd:wait() do
if event & (epoll.EPOLLERR | epoll.EPOLLHUP) ~= 0 then
assert(false, "unknown error")
return
end
if event & epoll.EPOLLIN ~= 0 then
while true do
local r = dispatch(req:pop())
if r == 0 then
return
elseif r == 1 then
break
end
end
end
end
end
Expand All @@ -189,15 +196,18 @@ function test_channel:test_fd()
epfd:event_add(res:fd(), epoll.EPOLLIN)
local function test_ok(...)
req:push(...)
for _, event in epfd:wait() do
if event & (epoll.EPOLLERR | epoll.EPOLLHUP) ~= 0 then
lt.failure("unknown error")
end
if event & epoll.EPOLLIN ~= 0 then
local r = table.pack(res:pop())
if r[1] == true then
lt.assertEquals(r, table.pack(true, ...))
break
local expected = table.pack(true, ...)
while true do
for _, event in epfd:wait() do
if event & (epoll.EPOLLERR | epoll.EPOLLHUP) ~= 0 then
lt.failure("unknown error")
end
if event & epoll.EPOLLIN ~= 0 then
local actual = table.pack(res:pop())
if actual[1] == true then
lt.assertEquals(actual, expected)
return
end
end
end
end
Expand Down

0 comments on commit 297c0ae

Please sign in to comment.