diff --git a/include/async_mqtt/broker/broker.hpp b/include/async_mqtt/broker/broker.hpp index 4bfcd09d5..f9ea5cd73 100644 --- a/include/async_mqtt/broker/broker.hpp +++ b/include/async_mqtt/broker/broker.hpp @@ -734,76 +734,90 @@ class broker { return true; } - void send_connack( + template + auto send_connack( epsp_t& epsp, bool session_present, bool authenticated, properties props, - std::function finish = [](system_error const&){} + CompletionToken&& token ) { - ASYNC_MQTT_LOG("mqtt_broker", trace) - << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) - << "send_connack"; - // Reply to the connect message. - switch (epsp.get_protocol_version()) { - case protocol_version::v3_1_1: - if (connack_) { - epsp.send( - v3_1_1::connack_packet{ - session_present, - authenticated ? connect_return_code::accepted - : connect_return_code::not_authorized, - }, - [finish = force_move(finish)] - (system_error const& ec) { - finish(ec); + auto init = + [this] + ( + auto completion_handler, + epsp_t epsp, + bool session_present, + bool authenticated, + properties props + ) { + ASYNC_MQTT_LOG("mqtt_broker", trace) + << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) + << "send_connack"; + // Reply to the connect message. + switch (epsp.get_protocol_version()) { + case protocol_version::v3_1_1: + if (connack_) { + epsp.send( + v3_1_1::connack_packet{ + session_present, + authenticated ? connect_return_code::accepted + : connect_return_code::not_authorized, + }, + force_move(completion_handler) + ); } - ); - } - break; - case protocol_version::v5: - // connack_props_ member varible is for testing - if (connack_props_.empty()) { - // props local variable is is for real case - props.emplace_back(property::topic_alias_maximum{topic_alias_max}); - props.emplace_back(property::receive_maximum{receive_maximum_max}); - if (connack_) { - epsp.send( - v5::connack_packet{ - session_present, - authenticated ? connect_reason_code::success - : connect_reason_code::not_authorized, - force_move(props) - }, - [finish = force_move(finish)] - (system_error const& ec) { - finish(ec); + break; + case protocol_version::v5: + // connack_props_ member varible is for testing + if (connack_props_.empty()) { + // props local variable is is for real case + props.emplace_back(property::topic_alias_maximum{topic_alias_max}); + props.emplace_back(property::receive_maximum{receive_maximum_max}); + if (connack_) { + epsp.send( + v5::connack_packet{ + session_present, + authenticated ? connect_reason_code::success + : connect_reason_code::not_authorized, + force_move(props) + }, + force_move(completion_handler) + ); } - ); - } - } - else { - // use connack_props_ for testing - if (connack_) { - epsp.send( - v5::connack_packet{ - session_present, - authenticated ? connect_reason_code::success - : connect_reason_code::not_authorized, - connack_props_ - }, - [finish = force_move(finish)] - (system_error const& ec) { - finish(ec); + } + else { + // use connack_props_ for testing + if (connack_) { + epsp.send( + v5::connack_packet{ + session_present, + authenticated ? connect_reason_code::success + : connect_reason_code::not_authorized, + connack_props_ + }, + force_move(completion_handler) + ); } - ); + } + break; + default: + BOOST_ASSERT(false); + break; } - } - break; - default: - BOOST_ASSERT(false); - break; - } + }; + + return as::async_initiate< + CompletionToken, + void(system_error const&) + >( + init, + token, + force_move(epsp), + session_present, + authenticated, + force_move(props) + ); } template @@ -1796,43 +1810,106 @@ class broker { * @return true if offline session is remained, otherwise false */ // TODO: Maybe change the name of this function. - template - void close_proc_no_lock( + template + auto close_proc_no_lock( epsp_t epsp, bool send_will, optional rc_opt, - Finish&& finish) { - - ASYNC_MQTT_LOG("mqtt_broker", trace) - << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) - << "close_proc_no_lock"; - - auto& idx = sessions_.template get(); - auto it = idx.find(epsp); + CompletionToken&& token) { + + auto init = + [this] + ( + auto completion_handler, + epsp_t epsp, + bool send_will, + optional rc_opt + ) { + ASYNC_MQTT_LOG("mqtt_broker", trace) + << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) + << "close_proc_no_lock"; + + auto& idx = sessions_.template get(); + auto it = idx.find(epsp); + + // act_sess_it == act_sess_idx.end() could happen if broker accepts + // the session from client but the client closes the session before sending + // MQTT `CONNECT` message. + // In this case, do nothing is correct behavior. + if (it == idx.end()) { + force_move(completion_handler)(false); + return; + } - // act_sess_it == act_sess_idx.end() could happen if broker accepts - // the session from client but the client closes the session before sending - // MQTT `CONNECT` message. - // In this case, do nothing is correct behavior. - if (it == idx.end()) { - finish(false); - return; - } + auto do_send_will = + [&](session_state& ss) { + if (send_will) { + ss.send_will(); + } + else { + ss.clear_will(); + } + }; - auto do_send_will = - [&](session_state& ss) { - if (send_will) { - ss.send_will(); + if ((*it)->remain_after_close()) { + idx.modify( + it, + [&](std::shared_ptr>& sssp) { + do_send_will(*sssp); + if (rc_opt) { + ASYNC_MQTT_LOG("mqtt_broker", trace) + << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) + << "disconnect_and_close() cid:" << sssp->client_id(); + disconnect_and_close( + epsp, + (*it)->get_protocol_version(), + *rc_opt, + [this, sssp, epsp, completion_handler = force_move(completion_handler)] + () mutable { + // become_offline updates index + sssp->become_offline( + epsp, + [this] + (std::shared_ptr const& sp_tim) { + // lock for expire (async) + std::lock_guard g(mtx_sessions_); + sessions_.template get().erase(sp_tim); + } + ); + force_move(completion_handler)(true); + } + ); + } + else { + ASYNC_MQTT_LOG("mqtt_broker", trace) + << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) + << "close cid:" << sssp->client_id(); + epsp.close( + [this, epsp, sssp, completion_handler = force_move(completion_handler)] + () mutable { + ASYNC_MQTT_LOG("mqtt_broker", info) + << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) + << "closed"; + // become_offline updates index + sssp->become_offline( + epsp, + [this] + (std::shared_ptr const& sp_tim) { + // lock for expire (async) + std::lock_guard g(mtx_sessions_); + sessions_.template get().erase(sp_tim); + } + ); + force_move(completion_handler)(true); + } + ); + } + }, + [](auto&) { BOOST_ASSERT(false); } + ); } else { - ss.clear_will(); - } - }; - - if ((*it)->remain_after_close()) { - idx.modify( - it, - [&](std::shared_ptr>& sssp) { + auto sssp{force_move(idx.extract(it).value())}; do_send_will(*sssp); if (rc_opt) { ASYNC_MQTT_LOG("mqtt_broker", trace) @@ -1840,21 +1917,14 @@ class broker { << "disconnect_and_close() cid:" << sssp->client_id(); disconnect_and_close( epsp, - (*it)->get_protocol_version(), + sssp->get_protocol_version(), *rc_opt, - [this, sssp, epsp, finish = std::forward(finish)] + [sssp, epsp, completion_handler = force_move(completion_handler)] () mutable { - // become_offline updates index - sssp->become_offline( - epsp, - [this] - (std::shared_ptr const& sp_tim) { - // lock for expire (async) - std::lock_guard g(mtx_sessions_); - sessions_.template get().erase(sp_tim); - } - ); - finish(true); + ASYNC_MQTT_LOG("mqtt_broker", info) + << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) + << "disconnect_and_closed"; + force_move(completion_handler)(false); } ); } @@ -1863,64 +1933,28 @@ class broker { << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) << "close cid:" << sssp->client_id(); epsp.close( - [this, epsp, sssp, finish = std::forward(finish)] + [sssp, epsp, completion_handler = force_move(completion_handler)] () mutable { ASYNC_MQTT_LOG("mqtt_broker", info) << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) << "closed"; - // become_offline updates index - sssp->become_offline( - epsp, - [this] - (std::shared_ptr const& sp_tim) { - // lock for expire (async) - std::lock_guard g(mtx_sessions_); - sessions_.template get().erase(sp_tim); - } - ); - finish(true); + force_move(completion_handler)(false); } ); } - }, - [](auto&) { BOOST_ASSERT(false); } - ); - } - else { - auto sssp{force_move(idx.extract(it).value())}; - do_send_will(*sssp); - if (rc_opt) { - ASYNC_MQTT_LOG("mqtt_broker", trace) - << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) - << "disconnect_and_close() cid:" << sssp->client_id(); - disconnect_and_close( - epsp, - sssp->get_protocol_version(), - *rc_opt, - [sssp, epsp, finish = std::forward(finish)] - () mutable { - ASYNC_MQTT_LOG("mqtt_broker", info) - << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) - << "disconnect_and_closed"; - finish(false); - } - ); - } - else { - ASYNC_MQTT_LOG("mqtt_broker", trace) - << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) - << "close cid:" << sssp->client_id(); - epsp.close( - [sssp, epsp, finish = std::forward(finish)] - () mutable { - ASYNC_MQTT_LOG("mqtt_broker", info) - << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) - << "closed"; - finish(false); - } - ); - } - } + } + }; + + return as::async_initiate< + CompletionToken, + void(bool) + >( + init, + token, + force_move(epsp), + send_will, + force_move(rc_opt) + ); } /** @@ -1958,52 +1992,72 @@ class broker { if (h_auth_props_) h_auth_props_(force_move(props)); } - template - static void disconnect_and_close( + template + static auto disconnect_and_close( epsp_t epsp, protocol_version version, disconnect_reason_code rc, - Finish&& finish = []{} + CompletionToken&& token ) { - switch (version) { - case protocol_version::v3_1_1: - epsp.close( - [epsp, finish = std::forward(finish)] - () mutable { - ASYNC_MQTT_LOG("mqtt_broker", info) - << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) - << "closed"; - finish(); - } - ); - break; - case protocol_version::v5: - epsp.send( - v5::disconnect_packet{ - rc, - properties{} - }, - [epsp, finish = std::forward(finish)] - (system_error const& ec) mutable { - ASYNC_MQTT_LOG("mqtt_broker", info) - << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) - << ec.what(); + auto init = + [] + ( + auto completion_handler, + epsp_t epsp, + protocol_version version, + disconnect_reason_code rc + ) { + switch (version) { + case protocol_version::v3_1_1: epsp.close( - [epsp, finish = force_move(finish)] + [epsp, completion_handler = force_move(completion_handler)] () mutable { ASYNC_MQTT_LOG("mqtt_broker", info) << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) << "closed"; - finish(); + force_move(completion_handler)(); } ); + break; + case protocol_version::v5: + epsp.send( + v5::disconnect_packet{ + rc, + properties{} + }, + [epsp, completion_handler = force_move(completion_handler)] + (system_error const& ec) mutable { + ASYNC_MQTT_LOG("mqtt_broker", info) + << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) + << ec.what(); + epsp.close( + [epsp, completion_handler = force_move(completion_handler)] + () mutable { + ASYNC_MQTT_LOG("mqtt_broker", info) + << ASYNC_MQTT_ADD_VALUE(address, epsp.get_address()) + << "closed"; + force_move(completion_handler)(); + } + ); + } + ); + break; + default: + BOOST_ASSERT(false); + break; } - ); - break; - default: - BOOST_ASSERT(false); - break; - } + }; + + return as::async_initiate< + CompletionToken, + void() + >( + init, + token, + force_move(epsp), + version, + rc + ); } private: diff --git a/include/async_mqtt/util/scope_guard.hpp b/include/async_mqtt/util/scope_guard.hpp index eb1615cee..b8b916c8e 100644 --- a/include/async_mqtt/util/scope_guard.hpp +++ b/include/async_mqtt/util/scope_guard.hpp @@ -14,7 +14,7 @@ namespace async_mqtt { template inline auto unique_scope_guard(Proc&& proc) { - auto deleter = [proc = std::forward(proc)](void*) mutable { std::forward(proc)(); }; + auto deleter = [proc = std::forward(proc)](void*) mutable { return std::forward(proc)(); }; return std::unique_ptr(&deleter, force_move(deleter)); }