Skip to content

Commit

Permalink
Merge pull request #102 from redboltz/make_spize_ep
Browse files Browse the repository at this point in the history
Fixed multiple close problem.
  • Loading branch information
redboltz authored Dec 11, 2023
2 parents 5a6a899 + b59627e commit a14a9b4
Show file tree
Hide file tree
Showing 44 changed files with 1,529 additions and 1,436 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 4.0.0
### breaking changes
- Fixed multiple close problem. In order to do that endpoint become shared_ptr based design. #98, #100, #101, #102
### other updates
- Refined documents. #97
- Added TLS async_shutdown timeout. #99

## 3.0.0
### breaking changes
- Fixed inconsistent function names. #84, #89
Expand Down
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# http://www.boost.org/LICENSE_1_0.txt)

cmake_minimum_required (VERSION 3.13.0)
project(async_mqtt_iface VERSION 3.0.0)
project(async_mqtt_iface VERSION 4.0.0)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
Expand Down Expand Up @@ -107,7 +107,7 @@ if(DOXYGEN_FOUND)
COMMAND ${CMAKE_COMMAND} -E echo "FILE_PATTERNS = *.hpp" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "OUTPUT_DIRECTORY = doc" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "PROJECT_NAME = async_mqtt" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "PROJECT_NUMBER = 3.0.0" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "PROJECT_NUMBER = 4.0.0" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "RECURSIVE = YES" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "PREDEFINED = _DOXYGEN_ ASYNC_MQTT_USE_TLS ASYNC_MQTT_USE_WS" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
COMMAND ${CMAKE_COMMAND} -E echo "INPUT = ${CMAKE_CURRENT_SOURCE_DIR}/include/async_mqtt" >> ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Asynchronous MQTT communication library.

Version 3.0.0 [![Actions Status](https://github.com/redboltz/async_mqtt/workflows/CI/badge.svg)](https://github.com/redboltz/async_mqtt/actions)[![codecov](https://codecov.io/gh/redboltz/async_mqtt/branch/main/graph/badge.svg)](https://codecov.io/gh/redboltz/async_mqtt)
Version 4.0.0 [![Actions Status](https://github.com/redboltz/async_mqtt/workflows/CI/badge.svg)](https://github.com/redboltz/async_mqtt/actions)[![codecov](https://codecov.io/gh/redboltz/async_mqtt/branch/main/graph/badge.svg)](https://codecov.io/gh/redboltz/async_mqtt)

This is Boost.Asio oriented asynchronous MQTT communication library. You can use async_mqtt to develop not only your MQTT client application but also your server (e.g. broker).
Based on https://github.com/redboltz/mqtt_cpp experience, there are many improvements. See overview.
Expand Down
26 changes: 13 additions & 13 deletions example/ep_cb_mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ int main(int argc, char* argv[]) {
as::io_context ioc;
as::ip::tcp::socket resolve_sock{ioc};
as::ip::tcp::resolver res{resolve_sock.get_executor()};
am::endpoint<am::role::client, am::protocol::mqtt> amep {
auto amep = am::endpoint<am::role::client, am::protocol::mqtt>::create(
am::protocol_version::v3_1_1,
ioc.get_executor()
};
);

std::cout << "start" << std::endl;
std::size_t count = 0;
Expand All @@ -44,7 +44,7 @@ int main(int argc, char* argv[]) {

// Underlying TCP connect
as::async_connect(
amep.next_layer(),
amep->next_layer(),
eps,
[&]
(boost::system::error_code ec, as::ip::tcp::endpoint /*unused*/) {
Expand All @@ -54,7 +54,7 @@ int main(int argc, char* argv[]) {
<< std::endl;
if (ec) return;
// Send MQTT CONNECT
amep.send(
amep->send(
am::v3_1_1::connect_packet{
true, // clean_session
0x1234, // keep_alive
Expand All @@ -70,7 +70,7 @@ int main(int argc, char* argv[]) {
return;
}
// Recv MQTT CONNACK
amep.recv(
amep->recv(
[&]
(am::packet_variant pv) {
if (pv) {
Expand All @@ -82,9 +82,9 @@ int main(int argc, char* argv[]) {
<< " sp:" << p.session_present()
<< std::endl;
// Send MQTT SUBSCRIBE
amep.send(
amep->send(
am::v3_1_1::subscribe_packet{
*amep.acquire_unique_packet_id(),
*amep->acquire_unique_packet_id(),
{ {am::allocate_buffer("topic1"), am::qos::at_most_once} }
},
[&]
Expand All @@ -94,7 +94,7 @@ int main(int argc, char* argv[]) {
return;
}
// Recv MQTT SUBACK
amep.recv(
amep->recv(
[&]
(am::packet_variant pv) {
if (pv) {
Expand All @@ -110,9 +110,9 @@ int main(int argc, char* argv[]) {
}
std::cout << std::endl;
// Send MQTT PUBLISH
amep.send(
amep->send(
am::v3_1_1::publish_packet{
*amep.acquire_unique_packet_id(),
*amep->acquire_unique_packet_id(),
am::allocate_buffer("topic1"),
am::allocate_buffer("payload1"),
am::qos::at_least_once
Expand Down Expand Up @@ -152,11 +152,11 @@ int main(int argc, char* argv[]) {
}
);
if (++count < 2) {
amep.recv(*recv_handler);
amep->recv(*recv_handler);
}
else {
std::cout << "close" << std::endl;
amep.close([]{});
amep->close([]{});
}
}
else {
Expand All @@ -167,7 +167,7 @@ int main(int argc, char* argv[]) {
return;
}
};
amep.recv(*recv_handler);
amep->recv(*recv_handler);
}
);
},
Expand Down
24 changes: 12 additions & 12 deletions example/ep_cpp20coro_mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ as::awaitable<void>
proc(Executor exe, std::string_view host, std::string_view port) {
as::ip::tcp::socket resolve_sock{exe};
as::ip::tcp::resolver res{exe};
am::endpoint<am::role::client, am::protocol::mqtt> amep {
auto amep = am::endpoint<am::role::client, am::protocol::mqtt>::create(
am::protocol_version::v3_1_1,
exe
};
);
std::cout << "start" << std::endl;

try {
Expand All @@ -35,14 +35,14 @@ proc(Executor exe, std::string_view host, std::string_view port) {

// Underlying TCP connect
co_await as::async_connect(
amep.next_layer(),
amep->next_layer(),
eps,
as::use_awaitable
);
std::cout << "TCP connected" << std::endl;

// Send MQTT CONNECT
if (auto se = co_await amep.send(
if (auto se = co_await amep->send(
am::v3_1_1::connect_packet{
true, // clean_session
0x1234, // keep_alive
Expand All @@ -59,7 +59,7 @@ proc(Executor exe, std::string_view host, std::string_view port) {
}

// Recv MQTT CONNACK
if (am::packet_variant pv = co_await amep.recv(as::use_awaitable)) {
if (am::packet_variant pv = co_await amep->recv(as::use_awaitable)) {
pv.visit(
am::overload {
[&](am::v3_1_1::connack_packet const& p) {
Expand All @@ -84,9 +84,9 @@ proc(Executor exe, std::string_view host, std::string_view port) {
std::vector<am::topic_subopts> sub_entry{
{am::allocate_buffer("topic1"), am::qos::at_most_once}
};
if (auto se = co_await amep.send(
if (auto se = co_await amep->send(
am::v3_1_1::subscribe_packet{
*amep.acquire_unique_packet_id(),
*amep->acquire_unique_packet_id(),
am::force_move(sub_entry) // sub_entry variable is required to avoid g++ bug
},
as::use_awaitable
Expand All @@ -96,7 +96,7 @@ proc(Executor exe, std::string_view host, std::string_view port) {
co_return;
}
// Recv MQTT SUBACK
if (am::packet_variant pv = co_await amep.recv(as::use_awaitable)) {
if (am::packet_variant pv = co_await amep->recv(as::use_awaitable)) {
pv.visit(
am::overload {
[&](am::v3_1_1::suback_packet const& p) {
Expand All @@ -121,9 +121,9 @@ proc(Executor exe, std::string_view host, std::string_view port) {
co_return;
}
// Send MQTT PUBLISH
if (auto se = co_await amep.send(
if (auto se = co_await amep->send(
am::v3_1_1::publish_packet{
*amep.acquire_unique_packet_id(),
*amep->acquire_unique_packet_id(),
am::allocate_buffer("topic1"),
am::allocate_buffer("payload1"),
am::qos::at_least_once
Expand All @@ -136,7 +136,7 @@ proc(Executor exe, std::string_view host, std::string_view port) {
}
// Recv MQTT PUBLISH and PUBACK (order depends on broker)
for (std::size_t count = 0; count != 2; ++count) {
if (am::packet_variant pv = co_await amep.recv(as::use_awaitable)) {
if (am::packet_variant pv = co_await amep->recv(as::use_awaitable)) {
pv.visit(
am::overload {
[&](am::v3_1_1::publish_packet const& p) {
Expand Down Expand Up @@ -169,7 +169,7 @@ proc(Executor exe, std::string_view host, std::string_view port) {
}
}
std::cout << "close" << std::endl;
co_await amep.close(as::use_awaitable);
co_await amep->close(as::use_awaitable);
}
catch (boost::system::system_error const& se) {
std::cout << se.what() << std::endl;
Expand Down
24 changes: 12 additions & 12 deletions example/ep_future_mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ int main(int argc, char* argv[]) {
as::io_context ioc;
as::ip::tcp::socket resolve_sock{ioc};
as::ip::tcp::resolver res{resolve_sock.get_executor()};
am::endpoint<am::role::client, am::protocol::mqtt> amep {
auto amep = am::endpoint<am::role::client, am::protocol::mqtt>::create(
am::protocol_version::v3_1_1,
ioc.get_executor()
};
);

// async_mqtt thread
auto guard = as::make_work_guard(ioc.get_executor());
Expand Down Expand Up @@ -56,7 +56,7 @@ int main(int argc, char* argv[]) {
auto eps = f_res.get();

auto f_con = as::async_connect(
amep.next_layer(),
amep->next_layer(),
eps,
as::use_future
);
Expand All @@ -65,7 +65,7 @@ int main(int argc, char* argv[]) {

// Send MQTT CONNECT
{
auto fut = amep.send(
auto fut = amep->send(
am::v3_1_1::connect_packet{
true, // clean_session
0x1234, // keep_alive
Expand All @@ -85,7 +85,7 @@ int main(int argc, char* argv[]) {

// Recv MQTT CONNACK
{
auto fut = amep.recv(as::use_future);
auto fut = amep->recv(as::use_future);
auto pv = fut.get(); // get am::packet_variant
if (pv) {
pv.visit(
Expand All @@ -112,9 +112,9 @@ int main(int argc, char* argv[]) {

// Send MQTT SUBSCRIBE
{
auto fut_id = amep.acquire_unique_packet_id(as::use_future);
auto fut_id = amep->acquire_unique_packet_id(as::use_future);
auto pid = fut_id.get();
auto fut = amep.send(
auto fut = amep->send(
am::v3_1_1::subscribe_packet{
*pid,
{ {am::allocate_buffer("topic1"), am::qos::at_most_once} }
Expand All @@ -130,7 +130,7 @@ int main(int argc, char* argv[]) {

// Recv MQTT SUBACK
{
auto fut = amep.recv(as::use_future);
auto fut = amep->recv(as::use_future);
auto pv = fut.get();
if (pv) {
pv.visit(
Expand Down Expand Up @@ -160,9 +160,9 @@ int main(int argc, char* argv[]) {

// Send MQTT PUBLISH
{
auto fut_id = amep.acquire_unique_packet_id(as::use_future);
auto fut_id = amep->acquire_unique_packet_id(as::use_future);
auto pid = fut_id.get();
auto fut = amep.send(
auto fut = amep->send(
am::v3_1_1::publish_packet{
*pid,
am::allocate_buffer("topic1"),
Expand All @@ -181,7 +181,7 @@ int main(int argc, char* argv[]) {
// Recv MQTT PUBLISH and PUBACK (order depends on broker)
{
for (std::size_t count = 0; count != 2; ++count) {
auto fut = amep.recv(as::use_future);
auto fut = amep->recv(as::use_future);
auto pv = fut.get();
if (pv) {
pv.visit(
Expand Down Expand Up @@ -218,7 +218,7 @@ int main(int argc, char* argv[]) {
}
{
std::cout << "close" << std::endl;
auto fut = amep.close(as::use_future);
auto fut = amep->close(as::use_future);
fut.get();
}
}
Expand Down
Loading

0 comments on commit a14a9b4

Please sign in to comment.