Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed multiple close problem. #102

Merged
merged 1 commit into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 4.0.0
### breaking changes
- Fixed multiple close problem. In order to do that endpoint become shared_ptr based design. #98, #100, #101, #102
### other updates
- Refined documents. #97
- Added TLS async_shutdown timeout. #99

## 3.0.0
### breaking changes
- Fixed inconsistent function names. #84, #89
Expand Down
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# http://www.boost.org/LICENSE_1_0.txt)

cmake_minimum_required (VERSION 3.13.0)
project(async_mqtt_iface VERSION 3.0.0)
project(async_mqtt_iface VERSION 4.0.0)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
Expand Down Expand Up @@ -107,7 +107,7 @@ if(DOXYGEN_FOUND)
COMMAND ${CMAKE_COMMAND} -E echo "FILE_PATTERNS = *.hpp" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "OUTPUT_DIRECTORY = doc" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "PROJECT_NAME = async_mqtt" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "PROJECT_NUMBER = 3.0.0" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "PROJECT_NUMBER = 4.0.0" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "RECURSIVE = YES" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "PREDEFINED = _DOXYGEN_ ASYNC_MQTT_USE_TLS ASYNC_MQTT_USE_WS" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "INPUT = ${CMAKE_CURRENT_SOURCE_DIR}/include/async_mqtt" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Asynchronous MQTT communication library.

Version 3.0.0 [![Actions Status](https://github.com/redboltz/async_mqtt/workflows/CI/badge.svg)](https://github.com/redboltz/async_mqtt/actions)[![codecov](https://codecov.io/gh/redboltz/async_mqtt/branch/main/graph/badge.svg)](https://codecov.io/gh/redboltz/async_mqtt)
Version 4.0.0 [![Actions Status](https://github.com/redboltz/async_mqtt/workflows/CI/badge.svg)](https://github.com/redboltz/async_mqtt/actions)[![codecov](https://codecov.io/gh/redboltz/async_mqtt/branch/main/graph/badge.svg)](https://codecov.io/gh/redboltz/async_mqtt)

This is Boost.Asio oriented asynchronous MQTT communication library. You can use async_mqtt to develop not only your MQTT client application but also your server (e.g. broker).
Based on https://github.com/redboltz/mqtt_cpp experience, there are many improvements. See overview.
Expand Down
26 changes: 13 additions & 13 deletions example/ep_cb_mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ int main(int argc, char* argv[]) {
as::io_context ioc;
as::ip::tcp::socket resolve_sock{ioc};
as::ip::tcp::resolver res{resolve_sock.get_executor()};
am::endpoint<am::role::client, am::protocol::mqtt> amep {
auto amep = am::endpoint<am::role::client, am::protocol::mqtt>::create(
am::protocol_version::v3_1_1,
ioc.get_executor()
};
);

std::cout << "start" << std::endl;
std::size_t count = 0;
Expand All @@ -44,7 +44,7 @@ int main(int argc, char* argv[]) {

// Underlying TCP connect
as::async_connect(
amep.next_layer(),
amep->next_layer(),
eps,
[&]
(boost::system::error_code ec, as::ip::tcp::endpoint /*unused*/) {
Expand All @@ -54,7 +54,7 @@ int main(int argc, char* argv[]) {
<< std::endl;
if (ec) return;
// Send MQTT CONNECT
amep.send(
amep->send(
am::v3_1_1::connect_packet{
true, // clean_session
0x1234, // keep_alive
Expand All @@ -70,7 +70,7 @@ int main(int argc, char* argv[]) {
return;
}
// Recv MQTT CONNACK
amep.recv(
amep->recv(
[&]
(am::packet_variant pv) {
if (pv) {
Expand All @@ -82,9 +82,9 @@ int main(int argc, char* argv[]) {
<< " sp:" << p.session_present()
<< std::endl;
// Send MQTT SUBSCRIBE
amep.send(
amep->send(
am::v3_1_1::subscribe_packet{
*amep.acquire_unique_packet_id(),
*amep->acquire_unique_packet_id(),
{ {am::allocate_buffer("topic1"), am::qos::at_most_once} }
},
[&]
Expand All @@ -94,7 +94,7 @@ int main(int argc, char* argv[]) {
return;
}
// Recv MQTT SUBACK
amep.recv(
amep->recv(
[&]
(am::packet_variant pv) {
if (pv) {
Expand All @@ -110,9 +110,9 @@ int main(int argc, char* argv[]) {
}
std::cout << std::endl;
// Send MQTT PUBLISH
amep.send(
amep->send(
am::v3_1_1::publish_packet{
*amep.acquire_unique_packet_id(),
*amep->acquire_unique_packet_id(),
am::allocate_buffer("topic1"),
am::allocate_buffer("payload1"),
am::qos::at_least_once
Expand Down Expand Up @@ -152,11 +152,11 @@ int main(int argc, char* argv[]) {
}
);
if (++count < 2) {
amep.recv(*recv_handler);
amep->recv(*recv_handler);
}
else {
std::cout << "close" << std::endl;
amep.close([]{});
amep->close([]{});
}
}
else {
Expand All @@ -167,7 +167,7 @@ int main(int argc, char* argv[]) {
return;
}
};
amep.recv(*recv_handler);
amep->recv(*recv_handler);
}
);
},
Expand Down
24 changes: 12 additions & 12 deletions example/ep_cpp20coro_mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ as::awaitable<void>
proc(Executor exe, std::string_view host, std::string_view port) {
as::ip::tcp::socket resolve_sock{exe};
as::ip::tcp::resolver res{exe};
am::endpoint<am::role::client, am::protocol::mqtt> amep {
auto amep = am::endpoint<am::role::client, am::protocol::mqtt>::create(
am::protocol_version::v3_1_1,
exe
};
);
std::cout << "start" << std::endl;

try {
Expand All @@ -35,14 +35,14 @@ proc(Executor exe, std::string_view host, std::string_view port) {

// Underlying TCP connect
co_await as::async_connect(
amep.next_layer(),
amep->next_layer(),
eps,
as::use_awaitable
);
std::cout << "TCP connected" << std::endl;

// Send MQTT CONNECT
if (auto se = co_await amep.send(
if (auto se = co_await amep->send(
am::v3_1_1::connect_packet{
true, // clean_session
0x1234, // keep_alive
Expand All @@ -59,7 +59,7 @@ proc(Executor exe, std::string_view host, std::string_view port) {
}

// Recv MQTT CONNACK
if (am::packet_variant pv = co_await amep.recv(as::use_awaitable)) {
if (am::packet_variant pv = co_await amep->recv(as::use_awaitable)) {
pv.visit(
am::overload {
[&](am::v3_1_1::connack_packet const& p) {
Expand All @@ -84,9 +84,9 @@ proc(Executor exe, std::string_view host, std::string_view port) {
std::vector<am::topic_subopts> sub_entry{
{am::allocate_buffer("topic1"), am::qos::at_most_once}
};
if (auto se = co_await amep.send(
if (auto se = co_await amep->send(
am::v3_1_1::subscribe_packet{
*amep.acquire_unique_packet_id(),
*amep->acquire_unique_packet_id(),
am::force_move(sub_entry) // sub_entry variable is required to avoid g++ bug
},
as::use_awaitable
Expand All @@ -96,7 +96,7 @@ proc(Executor exe, std::string_view host, std::string_view port) {
co_return;
}
// Recv MQTT SUBACK
if (am::packet_variant pv = co_await amep.recv(as::use_awaitable)) {
if (am::packet_variant pv = co_await amep->recv(as::use_awaitable)) {
pv.visit(
am::overload {
[&](am::v3_1_1::suback_packet const& p) {
Expand All @@ -121,9 +121,9 @@ proc(Executor exe, std::string_view host, std::string_view port) {
co_return;
}
// Send MQTT PUBLISH
if (auto se = co_await amep.send(
if (auto se = co_await amep->send(
am::v3_1_1::publish_packet{
*amep.acquire_unique_packet_id(),
*amep->acquire_unique_packet_id(),
am::allocate_buffer("topic1"),
am::allocate_buffer("payload1"),
am::qos::at_least_once
Expand All @@ -136,7 +136,7 @@ proc(Executor exe, std::string_view host, std::string_view port) {
}
// Recv MQTT PUBLISH and PUBACK (order depends on broker)
for (std::size_t count = 0; count != 2; ++count) {
if (am::packet_variant pv = co_await amep.recv(as::use_awaitable)) {
if (am::packet_variant pv = co_await amep->recv(as::use_awaitable)) {
pv.visit(
am::overload {
[&](am::v3_1_1::publish_packet const& p) {
Expand Down Expand Up @@ -169,7 +169,7 @@ proc(Executor exe, std::string_view host, std::string_view port) {
}
}
std::cout << "close" << std::endl;
co_await amep.close(as::use_awaitable);
co_await amep->close(as::use_awaitable);
}
catch (boost::system::system_error const& se) {
std::cout << se.what() << std::endl;
Expand Down
24 changes: 12 additions & 12 deletions example/ep_future_mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ int main(int argc, char* argv[]) {
as::io_context ioc;
as::ip::tcp::socket resolve_sock{ioc};
as::ip::tcp::resolver res{resolve_sock.get_executor()};
am::endpoint<am::role::client, am::protocol::mqtt> amep {
auto amep = am::endpoint<am::role::client, am::protocol::mqtt>::create(
am::protocol_version::v3_1_1,
ioc.get_executor()
};
);

// async_mqtt thread
auto guard = as::make_work_guard(ioc.get_executor());
Expand Down Expand Up @@ -56,7 +56,7 @@ int main(int argc, char* argv[]) {
auto eps = f_res.get();

auto f_con = as::async_connect(
amep.next_layer(),
amep->next_layer(),
eps,
as::use_future
);
Expand All @@ -65,7 +65,7 @@ int main(int argc, char* argv[]) {

// Send MQTT CONNECT
{
auto fut = amep.send(
auto fut = amep->send(
am::v3_1_1::connect_packet{
true, // clean_session
0x1234, // keep_alive
Expand All @@ -85,7 +85,7 @@ int main(int argc, char* argv[]) {

// Recv MQTT CONNACK
{
auto fut = amep.recv(as::use_future);
auto fut = amep->recv(as::use_future);
auto pv = fut.get(); // get am::packet_variant
if (pv) {
pv.visit(
Expand All @@ -112,9 +112,9 @@ int main(int argc, char* argv[]) {

// Send MQTT SUBSCRIBE
{
auto fut_id = amep.acquire_unique_packet_id(as::use_future);
auto fut_id = amep->acquire_unique_packet_id(as::use_future);
auto pid = fut_id.get();
auto fut = amep.send(
auto fut = amep->send(
am::v3_1_1::subscribe_packet{
*pid,
{ {am::allocate_buffer("topic1"), am::qos::at_most_once} }
Expand All @@ -130,7 +130,7 @@ int main(int argc, char* argv[]) {

// Recv MQTT SUBACK
{
auto fut = amep.recv(as::use_future);
auto fut = amep->recv(as::use_future);
auto pv = fut.get();
if (pv) {
pv.visit(
Expand Down Expand Up @@ -160,9 +160,9 @@ int main(int argc, char* argv[]) {

// Send MQTT PUBLISH
{
auto fut_id = amep.acquire_unique_packet_id(as::use_future);
auto fut_id = amep->acquire_unique_packet_id(as::use_future);
auto pid = fut_id.get();
auto fut = amep.send(
auto fut = amep->send(
am::v3_1_1::publish_packet{
*pid,
am::allocate_buffer("topic1"),
Expand All @@ -181,7 +181,7 @@ int main(int argc, char* argv[]) {
// Recv MQTT PUBLISH and PUBACK (order depends on broker)
{
for (std::size_t count = 0; count != 2; ++count) {
auto fut = amep.recv(as::use_future);
auto fut = amep->recv(as::use_future);
auto pv = fut.get();
if (pv) {
pv.visit(
Expand Down Expand Up @@ -218,7 +218,7 @@ int main(int argc, char* argv[]) {
}
{
std::cout << "close" << std::endl;
auto fut = amep.close(as::use_future);
auto fut = amep->close(as::use_future);
fut.get();
}
}
Expand Down
Loading