diff --git a/include/async_mqtt/impl/endpoint_close.hpp b/include/async_mqtt/impl/endpoint_close.hpp index 24bdea909..0b79e42f4 100644 --- a/include/async_mqtt/impl/endpoint_close.hpp +++ b/include/async_mqtt/impl/endpoint_close.hpp @@ -51,7 +51,7 @@ close_op { ASYNC_MQTT_LOG("mqtt_impl", trace) << ASYNC_MQTT_ADD_VALUE(address, &a_ep) << "already close requested"; - a_ep.close_queue_.post( + a_ep.close_queue_.post( force_move(self) ); } break; diff --git a/include/async_mqtt/predefined_layer/customized_ssl_stream.hpp b/include/async_mqtt/predefined_layer/customized_ssl_stream.hpp index 3a98c479f..0be45bd8a 100644 --- a/include/async_mqtt/predefined_layer/customized_ssl_stream.hpp +++ b/include/async_mqtt/predefined_layer/customized_ssl_stream.hpp @@ -68,60 +68,41 @@ struct layer_customize> { stream.get_executor(), shutdown_timeout ); - auto self_sp = std::make_shared(force_move(self)); + auto sig = std::make_shared(); tim->async_wait( - as::consign( - as::append( - std::ref(*self_sp), - std::weak_ptr(tim) - ), - self_sp - ) + [sig, wp = std::weak_ptr(tim)] + (error_code const& ec) { + if (!ec) { + if (auto sp = wp.lock()) { + ASYNC_MQTT_LOG("mqtt_impl", info) + << "TLS async_shutdown timeout"; + sig->emit(as::cancellation_type::terminal); + } + } + } ); - stream.async_shutdown( - as::consign( - std::ref(*self_sp), - self_sp, - tim + auto& a_stream{stream}; + a_stream.async_shutdown( + as::bind_cancellation_slot( + sig->slot(), + as::consign( + force_move(self), + tim, + sig + ) ) ); } - template - void operator()( - Self& self, - error_code const& ec, - std::weak_ptr wp - ) { - if (!ec) { - if (auto sp = wp.lock()) { - ASYNC_MQTT_LOG("mqtt_impl", info) - << "TLS async_shutdown timeout"; - BOOST_ASSERT(state == shutdown); - state = complete; - self.complete(ec); - return; - } - } - ASYNC_MQTT_LOG("mqtt_impl", info) - << "TLS async_shutdown timeout doesn't processed. ec:" << ec.message(); - } - template void operator()( Self& self, error_code const& ec ) { - if (state == complete) { - ASYNC_MQTT_LOG("mqtt_impl", info) - << "TLS async_shutdown already timeout"; - } - else { - ASYNC_MQTT_LOG("mqtt_impl", info) - << "TLS async_shutdown ec:" << ec.message(); - state = complete; - self.complete(ec); - } + ASYNC_MQTT_LOG("mqtt_impl", info) + << "TLS async_shutdown ec:" << ec.message(); + state = complete; + self.complete(ec); } }; }; diff --git a/include/async_mqtt/util/impl/stream_read_packet.hpp b/include/async_mqtt/util/impl/stream_read_packet.hpp index 865cff310..7950e63a9 100644 --- a/include/async_mqtt/util/impl/stream_read_packet.hpp +++ b/include/async_mqtt/util/impl/stream_read_packet.hpp @@ -47,6 +47,7 @@ struct stream_impl::stream_read_packet_op { a_strm.read_queue_.post( force_move(self) ); + a_strm.read_queue_.try_execute(); } break; case work: { a_strm.read_queue_.start_work(); diff --git a/include/async_mqtt/util/impl/stream_write_packet.hpp b/include/async_mqtt/util/impl/stream_write_packet.hpp index 66fe29d07..70f0ed207 100644 --- a/include/async_mqtt/util/impl/stream_write_packet.hpp +++ b/include/async_mqtt/util/impl/stream_write_packet.hpp @@ -51,6 +51,7 @@ struct stream_impl::stream_write_packet_op { a_strm.write_queue_.post( force_move(self) ); + a_strm.write_queue_.try_execute(); } break; case write: { a_strm.write_queue_.start_work(); diff --git a/include/async_mqtt/util/ioc_queue.hpp b/include/async_mqtt/util/ioc_queue.hpp index 0684eb729..5632fbbe9 100644 --- a/include/async_mqtt/util/ioc_queue.hpp +++ b/include/async_mqtt/util/ioc_queue.hpp @@ -40,6 +40,9 @@ class ioc_queue { queue_, std::forward(token) ); + } + + void try_execute() { if (immediate_executable()) { queue_.restart(); queue_.poll_one();