Skip to content

Commit

Permalink
Added C++20 coroutine based test.
Browse files Browse the repository at this point in the history
  • Loading branch information
redboltz committed Feb 6, 2024
1 parent 29dedbf commit 7b4aee4
Show file tree
Hide file tree
Showing 5 changed files with 491 additions and 5 deletions.
8 changes: 4 additions & 4 deletions include/async_mqtt/packet/v3_1_1_connect.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class connect_packet {
bool clean_session,
std::uint16_t keep_alive_sec,
buffer client_id,
optional<buffer> user_name,
optional<buffer> password
optional<buffer> user_name = nullopt,
optional<buffer> password = nullopt
):connect_packet(
clean_session,
keep_alive_sec,
Expand Down Expand Up @@ -112,8 +112,8 @@ class connect_packet {
std::uint16_t keep_alive_sec,
buffer client_id,
optional<will> w,
optional<buffer> user_name,
optional<buffer> password
optional<buffer> user_name = nullopt,
optional<buffer> password = nullopt
)
: fixed_header_{
make_fixed_header(control_packet_type::connect, 0b0000)
Expand Down
1 change: 0 additions & 1 deletion include/async_mqtt/packet/v5_puback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ class basic_puback_packet {

if (reason_code_) {
ret.emplace_back(as::buffer(&*reason_code_, 1));

if (property_length_buf_.size() != 0) {
ret.emplace_back(as::buffer(property_length_buf_.data(), property_length_buf_.size()));
auto props_cbs = async_mqtt::const_buffer_sequence(props_);
Expand Down
1 change: 1 addition & 0 deletions test/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ if("cxx_std_20" IN_LIST CMAKE_CXX_COMPILE_FEATURES)
message(STATUS "C++20 examples added")
list(APPEND check_PROGRAMS
ut_cpp20coro_exec.cpp
ut_cpp20coro_broker.cpp
)
endif()

Expand Down
279 changes: 279 additions & 0 deletions test/unit/cpp20coro_stub_socket.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
// Copyright Takatoshi Kondo 2022
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#if !defined(ASYNC_MQTT_CPP20CORO_STUB_SOCKET_HPP)
#define ASYNC_MQTT_CPP20CORO_STUB_SOCKET_HPP

#include <deque>

#include <boost/asio.hpp>
#include <boost/asio/experimental/channel.hpp>

#include <async_mqtt/buffer_to_packet_variant.hpp>

namespace async_mqtt {

namespace as = boost::asio;

struct cpp20coro_stub_socket {
using executor_type = as::any_io_executor;
using packet_iterator_t = packet_iterator<std::vector, as::const_buffer>;
using packet_range = std::pair<packet_iterator_t, packet_iterator_t>;

cpp20coro_stub_socket(
protocol_version version,
as::any_io_executor exe
)
:version_{version},
exe_{force_move(exe)}
{}

cpp20coro_stub_socket(
protocol_version version,
as::io_context& ioc
)
:version_{version},
exe_{ioc.get_executor()}
{}

template <typename CompletionToken>
auto emulate_recv(
packet_variant pv,
CompletionToken&& token
) {
return as::async_initiate<
CompletionToken,
void()
>(
[this](
auto completion_handler,
packet_variant pv
) {
ch_recv_.async_send(
pv,
[completion_handler = force_move(completion_handler)]
(auto) mutable {
force_move(completion_handler)();
}
);
},
token,
force_move(pv)
);
}

template <typename CompletionToken>
auto emulate_close(CompletionToken&& token) {
return emulate_recv(
errc::make_error_code(errc::connection_reset),
std::forward<CompletionToken>(token)
);
}

template <typename CompletionToken>
auto wait_response(CompletionToken&& token) {
return as::async_initiate<
CompletionToken,
void(packet_variant)
>(
[this](
auto completion_handler
) {
std::cout << "waiting" << std::endl;
ch_send_.async_receive(
[completion_handler = force_move(completion_handler)]
(packet_variant pv) mutable {
std::cout << "arrive:" << pv << std::endl;
force_move(completion_handler)(force_move(pv));
}
);
},
token
);
}

#if 0
auto const& lowest_layer() const {
return exe_;
}
auto& lowest_layer() {
return exe_;
}
#endif
auto get_executor() const {
return exe_;
}
auto get_executor() {
return exe_;
}
bool is_open() const {
return open_;
}
void close(error_code&) {
open_ = false;
ch_send_.async_send(
packet_variant{errc::make_error_code(errc::connection_reset)},
[](auto) {
}
);
}

template <typename ConstBufferSequence, typename CompletionToken>
auto async_write_some(
ConstBufferSequence const& buffers,
CompletionToken&& token
) {
return as::async_initiate<
CompletionToken,
void(boost::system::error_code, std::size_t)
>(
[this](
auto completion_handler,
ConstBufferSequence const& buffers
) {
auto it = as::buffers_iterator<ConstBufferSequence>::begin(buffers);
auto end = as::buffers_iterator<ConstBufferSequence>::end(buffers);
auto buf = allocate_buffer(it, end);
auto pv = buffer_to_packet_variant(buf, version_);
as::post(
as::bind_executor(
exe_,
[
this,
completion_handler = force_move(completion_handler),
dis = std::distance(it, end),
pv
] () mutable {
auto exe = as::get_associated_executor(completion_handler);
as::dispatch(
as::bind_executor(
exe,
[
this,
completion_handler = force_move(completion_handler),
dis,
pv
] () mutable {
force_move(completion_handler)(
boost::system::error_code{},
std::size_t(dis)
);
std::cout << "send:" << pv << std::endl;
ch_send_.async_send(
pv,
[](auto) {
std::cout << "send finish" << std::endl;
}
);
}
)
);
}
)
);
},
token,
buffers
);
}

template <typename MutableBufferSequence, typename CompletionToken>
void async_read_some(
MutableBufferSequence const& mb,
CompletionToken&& token
) {
auto partial_read =
[
this,
mb = mb
]
(CompletionToken&& token){
BOOST_ASSERT(pv_r_);
BOOST_ASSERT(static_cast<std::size_t>(std::distance(pv_r_->first, pv_r_->second)) >= mb.size());
as::mutable_buffer mb_copy = mb;
std::copy(
pv_r_->first,
std::next(pv_r_->first, std::ptrdiff_t(mb.size())),
static_cast<char*>(mb_copy.data())
);
std::advance(pv_r_->first, mb.size());
as::post(
as::bind_executor(
as::get_associated_executor(token),
[token = force_move(token), size = mb.size()] () mutable {
token(errc::make_error_code(errc::success), size);
}
)
);
if (pv_r_->first == pv_r_->second) {
pv_r_ = nullopt;
}
};

if (pv_r_) {
partial_read(std::forward<CompletionToken>(token));
return;
}

ch_recv_.async_receive(
[
this,
token = std::forward<CompletionToken>(token),
partial_read
]
(packet_variant pv) mutable {
if (!pv) {
auto exe = as::get_associated_executor(token);
as::post(
as::bind_executor(
exe,
[
token = force_move(token),
code = pv.get<system_error>().code()
] () mutable {
token(code, 0);
}
)
);
return;
}
pv_ = pv;
cbs_ = pv_.const_buffer_sequence();
pv_r_ = make_packet_range(cbs_);
partial_read(force_move(token));
}
);
}

private:
using channel_t = as::experimental::channel<void(packet_variant)>;
protocol_version version_;
as::any_io_executor exe_;
packet_variant pv_;
std::vector<as::const_buffer> cbs_;
optional<packet_range> pv_r_;
bool open_ = true;
channel_t ch_recv_{exe_, 1};
channel_t ch_send_{exe_, 1};
};

template <typename MutableBufferSequence, typename CompletionToken>
void async_read(
cpp20coro_stub_socket& socket,
MutableBufferSequence const& mb,
CompletionToken&& token
) {
socket.async_read_some(mb, std::forward<CompletionToken>(token));
}

inline bool is_close(packet_variant const& pv) {
if (pv) return false; // pv is packet
return pv.get<system_error>().code() == errc::make_error_code(errc::connection_reset);
}

} // namespace async_mqtt

#endif // ASYNC_MQTT_CPP20CORO_STUB_SOCKET_HPP
Loading

0 comments on commit 7b4aee4

Please sign in to comment.