Skip to content

Commit

Permalink
Fixed multiple close problem.
Browse files Browse the repository at this point in the history
close async function is called not only by user but also internally.
Their CompletionToken should be invoked validly. In order to do that,
the mechanism to extend endpoint lifetime is required.
So I made endpoint shared_ptr based design and using shared_from_this().
  • Loading branch information
redboltz committed Dec 11, 2023
1 parent 5a6a899 commit 95c3598
Show file tree
Hide file tree
Showing 44 changed files with 1,510 additions and 1,414 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 4.0.0
### breaking changes
- Fixed multiple close problem. In order to do that endpoint become shared_ptr based design. #98, #100, #101, #102
### other updates
- Refined documents. #97
- Added TLS async_shutdown timeout. #99

## 3.0.0
### breaking changes
- Fixed inconsistent function names. #84, #89
Expand Down
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# http://www.boost.org/LICENSE_1_0.txt)

cmake_minimum_required (VERSION 3.13.0)
project(async_mqtt_iface VERSION 3.0.0)
project(async_mqtt_iface VERSION 4.0.0)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
Expand Down Expand Up @@ -107,7 +107,7 @@ if(DOXYGEN_FOUND)
COMMAND ${CMAKE_COMMAND} -E echo "FILE_PATTERNS = *.hpp" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "OUTPUT_DIRECTORY = doc" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "PROJECT_NAME = async_mqtt" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "PROJECT_NUMBER = 3.0.0" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "PROJECT_NUMBER = 4.0.0" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "RECURSIVE = YES" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "PREDEFINED = _DOXYGEN_ ASYNC_MQTT_USE_TLS ASYNC_MQTT_USE_WS" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "INPUT = ${CMAKE_CURRENT_SOURCE_DIR}/include/async_mqtt" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Asynchronous MQTT communication library.

Version 3.0.0 [![Actions Status](https://github.com/redboltz/async_mqtt/workflows/CI/badge.svg)](https://github.com/redboltz/async_mqtt/actions)[![codecov](https://codecov.io/gh/redboltz/async_mqtt/branch/main/graph/badge.svg)](https://codecov.io/gh/redboltz/async_mqtt)
Version 4.0.0 [![Actions Status](https://github.com/redboltz/async_mqtt/workflows/CI/badge.svg)](https://github.com/redboltz/async_mqtt/actions)[![codecov](https://codecov.io/gh/redboltz/async_mqtt/branch/main/graph/badge.svg)](https://codecov.io/gh/redboltz/async_mqtt)

This is Boost.Asio oriented asynchronous MQTT communication library. You can use async_mqtt to develop not only your MQTT client application but also your server (e.g. broker).
Based on https://github.com/redboltz/mqtt_cpp experience, there are many improvements. See overview.
Expand Down
26 changes: 13 additions & 13 deletions example/ep_cb_mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ int main(int argc, char* argv[]) {
as::io_context ioc;
as::ip::tcp::socket resolve_sock{ioc};
as::ip::tcp::resolver res{resolve_sock.get_executor()};
am::endpoint<am::role::client, am::protocol::mqtt> amep {
auto amep = am::endpoint<am::role::client, am::protocol::mqtt>::create(
am::protocol_version::v3_1_1,
ioc.get_executor()
};
);

std::cout << "start" << std::endl;
std::size_t count = 0;
Expand All @@ -44,7 +44,7 @@ int main(int argc, char* argv[]) {

// Underlying TCP connect
as::async_connect(
amep.next_layer(),
amep->next_layer(),
eps,
[&]
(boost::system::error_code ec, as::ip::tcp::endpoint /*unused*/) {
Expand All @@ -54,7 +54,7 @@ int main(int argc, char* argv[]) {
<< std::endl;
if (ec) return;
// Send MQTT CONNECT
amep.send(
amep->send(
am::v3_1_1::connect_packet{
true, // clean_session
0x1234, // keep_alive
Expand All @@ -70,7 +70,7 @@ int main(int argc, char* argv[]) {
return;
}
// Recv MQTT CONNACK
amep.recv(
amep->recv(
[&]
(am::packet_variant pv) {
if (pv) {
Expand All @@ -82,9 +82,9 @@ int main(int argc, char* argv[]) {
<< " sp:" << p.session_present()
<< std::endl;
// Send MQTT SUBSCRIBE
amep.send(
amep->send(
am::v3_1_1::subscribe_packet{
*amep.acquire_unique_packet_id(),
*amep->acquire_unique_packet_id(),
{ {am::allocate_buffer("topic1"), am::qos::at_most_once} }
},
[&]
Expand All @@ -94,7 +94,7 @@ int main(int argc, char* argv[]) {
return;
}
// Recv MQTT SUBACK
amep.recv(
amep->recv(
[&]
(am::packet_variant pv) {
if (pv) {
Expand All @@ -110,9 +110,9 @@ int main(int argc, char* argv[]) {
}
std::cout << std::endl;
// Send MQTT PUBLISH
amep.send(
amep->send(
am::v3_1_1::publish_packet{
*amep.acquire_unique_packet_id(),
*amep->acquire_unique_packet_id(),
am::allocate_buffer("topic1"),
am::allocate_buffer("payload1"),
am::qos::at_least_once
Expand Down Expand Up @@ -152,11 +152,11 @@ int main(int argc, char* argv[]) {
}
);
if (++count < 2) {
amep.recv(*recv_handler);
amep->recv(*recv_handler);
}
else {
std::cout << "close" << std::endl;
amep.close([]{});
amep->close([]{});
}
}
else {
Expand All @@ -167,7 +167,7 @@ int main(int argc, char* argv[]) {
return;
}
};
amep.recv(*recv_handler);
amep->recv(*recv_handler);
}
);
},
Expand Down
24 changes: 12 additions & 12 deletions example/ep_cpp20coro_mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ as::awaitable<void>
proc(Executor exe, std::string_view host, std::string_view port) {
as::ip::tcp::socket resolve_sock{exe};
as::ip::tcp::resolver res{exe};
am::endpoint<am::role::client, am::protocol::mqtt> amep {
auto amep = am::endpoint<am::role::client, am::protocol::mqtt>::create(
am::protocol_version::v3_1_1,
exe
};
);
std::cout << "start" << std::endl;

try {
Expand All @@ -35,14 +35,14 @@ proc(Executor exe, std::string_view host, std::string_view port) {

// Underlying TCP connect
co_await as::async_connect(
amep.next_layer(),
amep->next_layer(),
eps,
as::use_awaitable
);
std::cout << "TCP connected" << std::endl;

// Send MQTT CONNECT
if (auto se = co_await amep.send(
if (auto se = co_await amep->send(
am::v3_1_1::connect_packet{
true, // clean_session
0x1234, // keep_alive
Expand All @@ -59,7 +59,7 @@ proc(Executor exe, std::string_view host, std::string_view port) {
}

// Recv MQTT CONNACK
if (am::packet_variant pv = co_await amep.recv(as::use_awaitable)) {
if (am::packet_variant pv = co_await amep->recv(as::use_awaitable)) {
pv.visit(
am::overload {
[&](am::v3_1_1::connack_packet const& p) {
Expand All @@ -84,9 +84,9 @@ proc(Executor exe, std::string_view host, std::string_view port) {
std::vector<am::topic_subopts> sub_entry{
{am::allocate_buffer("topic1"), am::qos::at_most_once}
};
if (auto se = co_await amep.send(
if (auto se = co_await amep->send(
am::v3_1_1::subscribe_packet{
*amep.acquire_unique_packet_id(),
*amep->acquire_unique_packet_id(),
am::force_move(sub_entry) // sub_entry variable is required to avoid g++ bug
},
as::use_awaitable
Expand All @@ -96,7 +96,7 @@ proc(Executor exe, std::string_view host, std::string_view port) {
co_return;
}
// Recv MQTT SUBACK
if (am::packet_variant pv = co_await amep.recv(as::use_awaitable)) {
if (am::packet_variant pv = co_await amep->recv(as::use_awaitable)) {
pv.visit(
am::overload {
[&](am::v3_1_1::suback_packet const& p) {
Expand All @@ -121,9 +121,9 @@ proc(Executor exe, std::string_view host, std::string_view port) {
co_return;
}
// Send MQTT PUBLISH
if (auto se = co_await amep.send(
if (auto se = co_await amep->send(
am::v3_1_1::publish_packet{
*amep.acquire_unique_packet_id(),
*amep->acquire_unique_packet_id(),
am::allocate_buffer("topic1"),
am::allocate_buffer("payload1"),
am::qos::at_least_once
Expand All @@ -136,7 +136,7 @@ proc(Executor exe, std::string_view host, std::string_view port) {
}
// Recv MQTT PUBLISH and PUBACK (order depends on broker)
for (std::size_t count = 0; count != 2; ++count) {
if (am::packet_variant pv = co_await amep.recv(as::use_awaitable)) {
if (am::packet_variant pv = co_await amep->recv(as::use_awaitable)) {
pv.visit(
am::overload {
[&](am::v3_1_1::publish_packet const& p) {
Expand Down Expand Up @@ -169,7 +169,7 @@ proc(Executor exe, std::string_view host, std::string_view port) {
}
}
std::cout << "close" << std::endl;
co_await amep.close(as::use_awaitable);
co_await amep->close(as::use_awaitable);
}
catch (boost::system::system_error const& se) {
std::cout << se.what() << std::endl;
Expand Down
24 changes: 12 additions & 12 deletions example/ep_future_mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ int main(int argc, char* argv[]) {
as::io_context ioc;
as::ip::tcp::socket resolve_sock{ioc};
as::ip::tcp::resolver res{resolve_sock.get_executor()};
am::endpoint<am::role::client, am::protocol::mqtt> amep {
auto amep = am::endpoint<am::role::client, am::protocol::mqtt>::create(
am::protocol_version::v3_1_1,
ioc.get_executor()
};
);

// async_mqtt thread
auto guard = as::make_work_guard(ioc.get_executor());
Expand Down Expand Up @@ -56,7 +56,7 @@ int main(int argc, char* argv[]) {
auto eps = f_res.get();

auto f_con = as::async_connect(
amep.next_layer(),
amep->next_layer(),
eps,
as::use_future
);
Expand All @@ -65,7 +65,7 @@ int main(int argc, char* argv[]) {

// Send MQTT CONNECT
{
auto fut = amep.send(
auto fut = amep->send(
am::v3_1_1::connect_packet{
true, // clean_session
0x1234, // keep_alive
Expand All @@ -85,7 +85,7 @@ int main(int argc, char* argv[]) {

// Recv MQTT CONNACK
{
auto fut = amep.recv(as::use_future);
auto fut = amep->recv(as::use_future);
auto pv = fut.get(); // get am::packet_variant
if (pv) {
pv.visit(
Expand All @@ -112,9 +112,9 @@ int main(int argc, char* argv[]) {

// Send MQTT SUBSCRIBE
{
auto fut_id = amep.acquire_unique_packet_id(as::use_future);
auto fut_id = amep->acquire_unique_packet_id(as::use_future);
auto pid = fut_id.get();
auto fut = amep.send(
auto fut = amep->send(
am::v3_1_1::subscribe_packet{
*pid,
{ {am::allocate_buffer("topic1"), am::qos::at_most_once} }
Expand All @@ -130,7 +130,7 @@ int main(int argc, char* argv[]) {

// Recv MQTT SUBACK
{
auto fut = amep.recv(as::use_future);
auto fut = amep->recv(as::use_future);
auto pv = fut.get();
if (pv) {
pv.visit(
Expand Down Expand Up @@ -160,9 +160,9 @@ int main(int argc, char* argv[]) {

// Send MQTT PUBLISH
{
auto fut_id = amep.acquire_unique_packet_id(as::use_future);
auto fut_id = amep->acquire_unique_packet_id(as::use_future);
auto pid = fut_id.get();
auto fut = amep.send(
auto fut = amep->send(
am::v3_1_1::publish_packet{
*pid,
am::allocate_buffer("topic1"),
Expand All @@ -181,7 +181,7 @@ int main(int argc, char* argv[]) {
// Recv MQTT PUBLISH and PUBACK (order depends on broker)
{
for (std::size_t count = 0; count != 2; ++count) {
auto fut = amep.recv(as::use_future);
auto fut = amep->recv(as::use_future);
auto pv = fut.get();
if (pv) {
pv.visit(
Expand Down Expand Up @@ -218,7 +218,7 @@ int main(int argc, char* argv[]) {
}
{
std::cout << "close" << std::endl;
auto fut = amep.close(as::use_future);
auto fut = amep->close(as::use_future);
fut.get();
}
}
Expand Down
Loading

0 comments on commit 95c3598

Please sign in to comment.