diff --git a/include/mqtt/client.hpp b/include/mqtt/client.hpp index f6981f61..f907bef7 100644 --- a/include/mqtt/client.hpp +++ b/include/mqtt/client.hpp @@ -1208,7 +1208,7 @@ class client : public endpoint { >* = nullptr > auto async_connect(CompletionToken&& token) { - return async_connect(any(), token); + return async_connect(any(), std::forward(token)); } /** @@ -1228,7 +1228,7 @@ class client : public endpoint { async_connect( v5::properties{}, force_move(session_life_keeper), - token + std::forward(token) ); } @@ -1265,7 +1265,7 @@ class client : public endpoint { >* = nullptr > auto async_connect(v5::properties props, CompletionToken&& token) { - return async_connect(force_move(props), any(), token); + return async_connect(force_move(props), any(), std::forward(token)); } /** @@ -1344,7 +1344,7 @@ class client : public endpoint { async_connect( force_move(socket), v5::properties{}, any(), - token + std::forward(token) ); } @@ -1368,7 +1368,7 @@ class client : public endpoint { force_move(socket), v5::properties{}, force_move(session_life_keeper), - token + std::forward(token) ); } @@ -1427,7 +1427,7 @@ class client : public endpoint { force_move(socket), force_move(props), any(), - token + std::forward(token) ); } @@ -1586,7 +1586,7 @@ class client : public endpoint { base::async_disconnect( reason_code, force_move(props), - token + std::forward(token) ); } @@ -1622,7 +1622,7 @@ class client : public endpoint { base::async_disconnect( reason_code, force_move(props), - token + std::forward(token) ); } @@ -1663,7 +1663,7 @@ class client : public endpoint { ); return base::async_disconnect( - token + std::forward(token) ); } @@ -1962,9 +1962,11 @@ class client : public endpoint { ) { BOOST_ASSERT(state == initiate); state = resolve; - resolver.async_resolve( - cl.host_, - cl.port_, + auto& a_cl{cl}; + auto& a_resolver{resolver}; + a_resolver.async_resolve( + a_cl.host_, + a_cl.port_, force_move(self) ); } @@ -1989,13 +1991,14 @@ class client : public endpoint { async_start_session(force_move(self)); break; #if defined(MQTT_USE_WS) - case tls_ws_handshake_tls: + case tls_ws_handshake_tls: { state = tls_ws_handshake_ws; + auto& socket = *cl.socket_; async_ws_handshake_socket( - *cl.socket_, + socket, force_move(self) ); - break; + } break; case tls_ws_handshake_ws: async_start_session(force_move(self)); break; @@ -2025,8 +2028,9 @@ class client : public endpoint { } else { state = connect; + auto& a_cl{cl}; as::async_connect( - cl.socket_->lowest_layer(), eps.begin(), eps.end(), force_move(self) + a_cl.socket_->lowest_layer(), eps.begin(), eps.end(), force_move(self) ); } } @@ -2046,8 +2050,9 @@ class client : public endpoint { if (cl.ping_duration_ != std::chrono::steady_clock::duration::zero()) { cl.set_timer(); } + auto& socket = *cl.socket_; async_handshake_socket( - *cl.socket_, + socket, force_move(self) ); } @@ -2067,9 +2072,10 @@ class client : public endpoint { ws_endpoint& socket, Self&& self) { state = ws_handshake; + auto& a_cl{cl}; socket.async_handshake( - cl.host_, - cl.path_, + a_cl.host_, + a_cl.path_, force_move(self) ); } @@ -2107,9 +2113,10 @@ class client : public endpoint { ws_endpoint, Strand>& socket, Self&& self) { state = tls_ws_handshake_ws; + auto& a_cl{cl}; socket.async_handshake( - cl.host_, - cl.path_, + a_cl.host_, + a_cl.path_, force_move(self) ); } @@ -2127,18 +2134,20 @@ class client : public endpoint { template void async_start_session(Self&& self) { state = complete; - cl.async_read_control_packet_type(force_move(session_life_keeper)); + auto& a_cl{cl}; + auto a_props = force_move(props); + a_cl.async_read_control_packet_type(force_move(session_life_keeper)); // sync base::connect() refer to parameters only in the function. // So they can be passed as view. - cl.base_async_connect( - buffer(string_view(static_cast(cl).get_client_id())), - (cl.user_name_ ? buffer(string_view(cl.user_name_.value())) - : optional() ), - ( cl.password_ ? buffer(string_view(cl.password_.value())) - : optional() ), - cl.will_, - cl.keep_alive_sec_, - force_move(props), + a_cl.base_async_connect( + buffer(string_view(static_cast(a_cl).get_client_id())), + (a_cl.user_name_ ? buffer(string_view(a_cl.user_name_.value())) + : optional() ), + (a_cl.password_ ? buffer(string_view(a_cl.password_.value())) + : optional() ), + a_cl.will_, + a_cl.keep_alive_sec_, + force_move(a_props), force_move(self) ); } diff --git a/test/system/st_comp_token.cpp b/test/system/st_comp_token.cpp index 8bc17e40..1b6250a7 100644 --- a/test/system/st_comp_token.cpp +++ b/test/system/st_comp_token.cpp @@ -18,102 +18,506 @@ using namespace MQTT_NS::literals; BOOST_AUTO_TEST_CASE( future ) { - MQTT_NS::setup_log( + auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) { + + auto& c = cs[0]; + clear_ordered(); + using packet_id_t = typename std::remove_reference_t::packet_id_t; + + auto wg = boost::asio::make_work_guard(ioc.get_executor()); + + c->set_client_id("cid1"); + c->set_clean_session(true); + + checker chk = { + // connect + cont("c_connect"), + cont("g_connect"), + + cont("c_subscribe"), + cont("g_subscribe"), + + cont("c_publish"), + cont("g_publish"), + + cont("c_unsubscribe"), + cont("g_unsubscribe"), + + cont("c_disconnect"), + cont("g_disconnect"), + }; + + // multiple times called handlers is still callback + c->set_close_handler( + [&] { + } + ); + c->set_error_handler( + [&] + (MQTT_NS::error_code) { + } + ); + + switch (c->get_protocol_version()) { + case MQTT_NS::protocol_version::v3_1_1: + c->set_connack_handler( + [&] + (bool, MQTT_NS::connect_return_code) { + }); + c->set_puback_handler( + [&] + (packet_id_t) { + BOOST_CHECK(false); + }); + c->set_pubrec_handler( + [&] + (std::uint16_t) { + BOOST_CHECK(false); + }); + c->set_pubcomp_handler( + [&] + (std::uint16_t) { + BOOST_CHECK(false); + }); + c->set_suback_handler( + [&] + (packet_id_t, std::vector) { + }); + c->set_unsuback_handler( + [&] + (packet_id_t) { + }); + c->set_publish_handler( + [&] + (MQTT_NS::optional, + MQTT_NS::publish_options, + MQTT_NS::buffer, + MQTT_NS::buffer) { + }); + break; + case MQTT_NS::protocol_version::v5: + c->set_v5_connack_handler( + [&] + (bool, MQTT_NS::v5::connect_reason_code, MQTT_NS::v5::properties) { + }); + c->set_v5_puback_handler( + [&] + (packet_id_t, MQTT_NS::v5::puback_reason_code, MQTT_NS::v5::properties) { + BOOST_CHECK(false); + }); + c->set_v5_pubrec_handler( + [&] + (packet_id_t, MQTT_NS::v5::pubrec_reason_code, MQTT_NS::v5::properties) { + BOOST_CHECK(false); + }); + c->set_v5_pubcomp_handler( + [&] + (packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties) { + BOOST_CHECK(false); + }); + c->set_v5_suback_handler( + [&] + (packet_id_t, std::vector, MQTT_NS::v5::properties) { + }); + c->set_v5_unsuback_handler( + [&] + (packet_id_t, std::vector, MQTT_NS::v5::properties) { + }); + c->set_v5_publish_handler( + [&] + (MQTT_NS::optional, + MQTT_NS::publish_options, + MQTT_NS::buffer, + MQTT_NS::buffer, + MQTT_NS::v5::properties) { + }); + break; + default: + BOOST_CHECK(false); + break; + } + + // future based code + + std::thread th_lib { + [&] { + try { + ioc.run(); + } + catch (std::exception const& e) { + BOOST_TEST_INFO(e.what()); + BOOST_CHECK(false); + } + } + }; + + // one-shot handler can be replaced with boost::asio::use_future { - { "mqtt_api", MQTT_NS::severity_level::trace }, - { "mqtt_cb", MQTT_NS::severity_level::trace }, - { "mqtt_impl", MQTT_NS::severity_level::trace }, - { "mqtt_broker", MQTT_NS::severity_level::trace }, + MQTT_CHK("c_connect"); + auto f = c->async_connect(boost::asio::use_future); + try { + f.get(); + MQTT_CHK("g_connect"); + } + catch (std::exception const& e) { + BOOST_TEST_INFO(e.what()); + BOOST_CHECK(false); + } } - ); - - boost::asio::io_context iocb; - MQTT_NS::broker::broker_t b(iocb); - MQTT_NS::optional s; - std::promise p; - auto f = p.get_future(); - std::thread th( - [&] { - s.emplace(iocb, b); - p.set_value(); - iocb.run(); + { + MQTT_CHK("c_subscribe"); + auto pid_sub = c->acquire_unique_packet_id(); + auto f = c->async_subscribe(pid_sub, "topic1", MQTT_NS::qos::exactly_once, boost::asio::use_future); + try { + f.get(); + MQTT_CHK("g_subscribe"); + } + catch (std::exception const& e) { + BOOST_TEST_INFO(e.what()); + BOOST_CHECK(false); + } } - ); - f.wait(); - auto finish = - [&] { - as::post( - iocb, - [&] { - s->close(); - } - ); + { + MQTT_CHK("c_publish"); + auto f = c->async_publish("topic1", "topic1_contents", MQTT_NS::qos::at_most_once, boost::asio::use_future); + try { + f.get(); + MQTT_CHK("g_publish"); + } + catch (std::exception const& e) { + BOOST_TEST_INFO(e.what()); + BOOST_CHECK(false); + } + } + { + MQTT_CHK("c_unsubscribe"); + auto pid_unsub = c->acquire_unique_packet_id(); + auto f = c->async_unsubscribe(pid_unsub, "topic1", boost::asio::use_future); + try { + f.get(); + MQTT_CHK("g_unsubscribe"); + } + catch (std::exception const& e) { + BOOST_TEST_INFO(e.what()); + BOOST_CHECK(false); + } + } + { + MQTT_CHK("c_disconnect"); + auto f = c->async_disconnect(boost::asio::use_future); + try { + f.get(); + MQTT_CHK("g_disconnect"); + } + catch (std::exception const& e) { + BOOST_TEST_INFO(e.what()); + BOOST_CHECK(false); + } + } + + wg.reset(); + finish(); + th_lib.join(); + BOOST_TEST(chk.all()); + }; + do_combi_test_async(test); +} + +BOOST_AUTO_TEST_CASE( user_strand ) { + auto test = [](boost::asio::io_context& ioc, auto& cs, auto finish, auto& /*b*/) { + + auto& c = cs[0]; + clear_ordered(); + using packet_id_t = typename std::remove_reference_t::packet_id_t; + + auto str_user = boost::asio::make_strand(ioc.get_executor()); + auto wg = boost::asio::make_work_guard(ioc.get_executor()); + + c->set_client_id("cid1"); + c->set_clean_session(true); + + checker chk = { + // connect + cont("h_connack"), + cont("h_suback"), + cont("h_publish"), + cont("h_unsuback"), + cont("h_close"), }; - boost::asio::io_context ioc; + c->set_close_handler( + [&] { + MQTT_CHK("h_close"); + finish(); + wg.reset(); + } + ); + c->set_error_handler( + [&] + (MQTT_NS::error_code) { + } + ); - auto c = MQTT_NS::make_client(ioc, broker_url, broker_notls_port, MQTT_NS::protocol_version::v3_1_1); - std::cout << "start" << std::endl; - c->set_client_id("cid1"); - c->set_clean_session(true); + switch (c->get_protocol_version()) { - c->set_close_handler( - [] { - std::cout << "closed" << std::endl; - } - ); - c->set_error_handler( - [] - (MQTT_NS::error_code ec) { - std::cout << ec.message() << std::endl; + case MQTT_NS::protocol_version::v3_1_1: + c->set_connack_handler( + boost::asio::bind_executor( + str_user, + [&] + (bool, MQTT_NS::connect_return_code) { + MQTT_CHK("h_connack"); + BOOST_TEST(str_user.running_in_this_thread()); + auto pid_sub = c->acquire_unique_packet_id(); + c->async_subscribe( + pid_sub, + "topic1", + MQTT_NS::qos::exactly_once, + boost::asio::bind_executor( + str_user, + [&](MQTT_NS::error_code) { + BOOST_TEST(str_user.running_in_this_thread()); + } + ) + ); + } + ) + ); + c->set_puback_handler( + boost::asio::bind_executor( + str_user, + [&] + (packet_id_t) { + BOOST_CHECK(false); + } + ) + ); + c->set_pubrec_handler( + boost::asio::bind_executor( + str_user, + [&] + (std::uint16_t) { + BOOST_CHECK(false); + } + ) + ); + c->set_pubcomp_handler( + boost::asio::bind_executor( + str_user, + [&] + (std::uint16_t) { + BOOST_CHECK(false); + } + ) + ); + c->set_suback_handler( + boost::asio::bind_executor( + str_user, + [&] + (packet_id_t, std::vector) { + MQTT_CHK("h_suback"); + BOOST_TEST(str_user.running_in_this_thread()); + c->async_publish( + "topic1", + "topic1_contents", + MQTT_NS::qos::at_most_once, + boost::asio::bind_executor( + str_user, + [&](MQTT_NS::error_code) { + BOOST_TEST(str_user.running_in_this_thread()); + } + ) + ); + } + ) + ); + c->set_unsuback_handler( + boost::asio::bind_executor( + str_user, + [&] + (packet_id_t) { + MQTT_CHK("h_unsuback"); + BOOST_TEST(str_user.running_in_this_thread()); + c->async_disconnect( + boost::asio::bind_executor( + str_user, + [&](MQTT_NS::error_code) { + BOOST_TEST(str_user.running_in_this_thread()); + } + ) + ); + } + ) + ); + c->set_publish_handler( + boost::asio::bind_executor( + str_user, + [&] + (MQTT_NS::optional, + MQTT_NS::publish_options, + MQTT_NS::buffer, + MQTT_NS::buffer) { + MQTT_CHK("h_publish"); + BOOST_TEST(str_user.running_in_this_thread()); + auto pid_unsub = c->acquire_unique_packet_id(); + c->async_unsubscribe( + pid_unsub, + "topic1", + boost::asio::bind_executor( + str_user, + [&](MQTT_NS::error_code) { + BOOST_TEST(str_user.running_in_this_thread()); + } + ) + ); + } + ) + ); + break; + case MQTT_NS::protocol_version::v5: + c->set_v5_connack_handler( + boost::asio::bind_executor( + str_user, + [&] + (bool, MQTT_NS::v5::connect_reason_code, MQTT_NS::v5::properties) { + MQTT_CHK("h_connack"); + BOOST_TEST(str_user.running_in_this_thread()); + auto pid_sub = c->acquire_unique_packet_id(); + c->async_subscribe( + pid_sub, + "topic1", + MQTT_NS::qos::exactly_once, + boost::asio::bind_executor( + str_user, + [&](MQTT_NS::error_code) { + BOOST_TEST(str_user.running_in_this_thread()); + } + ) + ); + } + ) + ); + c->set_v5_puback_handler( + boost::asio::bind_executor( + str_user, + [&] + (packet_id_t, MQTT_NS::v5::puback_reason_code, MQTT_NS::v5::properties) { + BOOST_CHECK(false); + } + ) + ); + c->set_v5_pubrec_handler( + boost::asio::bind_executor( + str_user, + [&] + (packet_id_t, MQTT_NS::v5::pubrec_reason_code, MQTT_NS::v5::properties) { + BOOST_CHECK(false); + } + ) + ); + c->set_v5_pubcomp_handler( + boost::asio::bind_executor( + str_user, + [&] + (packet_id_t, MQTT_NS::v5::pubcomp_reason_code, MQTT_NS::v5::properties) { + BOOST_CHECK(false); + } + ) + ); + c->set_v5_suback_handler( + boost::asio::bind_executor( + str_user, + [&] + (packet_id_t, std::vector, MQTT_NS::v5::properties) { + MQTT_CHK("h_suback"); + BOOST_TEST(str_user.running_in_this_thread()); + c->async_publish( + "topic1", + "topic1_contents", + MQTT_NS::qos::at_most_once, + boost::asio::bind_executor( + str_user, + [&](MQTT_NS::error_code) { + BOOST_TEST(str_user.running_in_this_thread()); + } + ) + ); + } + ) + ); + c->set_v5_unsuback_handler( + boost::asio::bind_executor( + str_user, + [&] + (packet_id_t, std::vector, MQTT_NS::v5::properties) { + MQTT_CHK("h_unsuback"); + BOOST_TEST(str_user.running_in_this_thread()); + c->async_disconnect( + boost::asio::bind_executor( + str_user, + [&](MQTT_NS::error_code) { + BOOST_TEST(str_user.running_in_this_thread()); + } + ) + ); + } + ) + ); + c->set_v5_publish_handler( + boost::asio::bind_executor( + str_user, + [&] + (MQTT_NS::optional, + MQTT_NS::publish_options, + MQTT_NS::buffer, + MQTT_NS::buffer, + MQTT_NS::v5::properties) { + MQTT_CHK("h_publish"); + BOOST_TEST(str_user.running_in_this_thread()); + auto pid_unsub = c->acquire_unique_packet_id(); + c->async_unsubscribe( + pid_unsub, + "topic1", + boost::asio::bind_executor( + str_user, + [&](MQTT_NS::error_code) { + BOOST_TEST(str_user.running_in_this_thread()); + } + ) + ); + } + ) + ); + break; + default: + BOOST_CHECK(false); + break; } - ); - boost::asio::io_context ioc_user; + c->async_connect( + boost::asio::bind_executor( + str_user, + [&](MQTT_NS::error_code) { + BOOST_TEST(str_user.running_in_this_thread()); + } + ) + ); - auto wg = boost::asio::make_work_guard(ioc.get_executor()); - std::thread th_lib { - [&] { - ioc.run(); - std::cout << "run exit" << std::endl; - } - }; + std::thread th_user { + [&] { + try { + ioc.run(); + } + catch (std::exception const& e) { + BOOST_TEST_INFO(e.what()); + BOOST_CHECK(false); + } + } + }; + th_user.join(); - { - std::cout << "connect" << std::endl; - auto f = c->async_connect(boost::asio::use_future); - f.get(); - std::cout << "connect get" << std::endl; - } - { - std::cout << "subscribe" << std::endl; - auto pid_sub = c->acquire_unique_packet_id(); - auto f = c->async_subscribe(pid_sub, "topic1", MQTT_NS::qos::exactly_once, boost::asio::use_future); - f.get(); - std::cout << "subscribe get" << std::endl; - } - { - std::cout << "publish" << std::endl; - auto pid_sub = c->acquire_unique_packet_id(); - auto f = c->async_publish("topic1", "topic1_contents", MQTT_NS::qos::at_most_once, boost::asio::use_future); - f.get(); - std::cout << "publish get" << std::endl; - } - { - std::cout << "disconnect" << std::endl; - auto f = c->async_disconnect(boost::asio::use_future); - f.get(); - std::cout << "disconnect get" << std::endl; - } - - std::cout << "reset" << std::endl; - wg.reset(); - std::cout << "finish" << std::endl; - finish(); - std::cout << "finish end" << std::endl; - th_lib.join(); - std::cout << "joined" << std::endl; - th.join(); + }; + do_combi_test_async(test); } BOOST_AUTO_TEST_SUITE_END()