diff --git a/include/async_mqtt/endpoint.hpp b/include/async_mqtt/endpoint.hpp index 520e06b2b..1bc50c118 100644 --- a/include/async_mqtt/endpoint.hpp +++ b/include/async_mqtt/endpoint.hpp @@ -2195,6 +2195,9 @@ class basic_endpoint { ) { switch (state) { case initiate: { + ASYNC_MQTT_LOG("mqtt_impl", trace) + << ASYNC_MQTT_ADD_VALUE(address, &ep) + << "close initiate status:" << static_cast(ep.status_) << std::endl; state = complete; ep.status_ = connection_status::closing; auto& a_ep{ep}; @@ -2202,6 +2205,9 @@ class basic_endpoint { } break; case complete: BOOST_ASSERT(ep.strand().running_in_this_thread()); + ASYNC_MQTT_LOG("mqtt_impl", trace) + << ASYNC_MQTT_ADD_VALUE(address, &ep) + << "close complete status:" << static_cast(ep.status_) << std::endl; ep.tim_pingreq_send_->cancel(); ep.tim_pingreq_recv_->cancel(); ep.tim_pingresp_recv_->cancel(); diff --git a/include/async_mqtt/stream.hpp b/include/async_mqtt/stream.hpp index e0a0f71c9..b511238d3 100644 --- a/include/async_mqtt/stream.hpp +++ b/include/async_mqtt/stream.hpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -405,20 +406,34 @@ class stream : public std::enable_shared_from_this> { } break; case write: { BOOST_ASSERT(strm.strand_.running_in_this_thread()); - state = bind; - BOOST_ASSERT(!strm.writing_); - strm.writing_ = true; - queue_work_guard.emplace(strm.queue_->get_executor()); - auto& a_strm{strm}; - auto cbs = packet->const_buffer_sequence(); - async_write( - a_strm.nl_, - cbs, - as::bind_executor( + if (strm.lowest_layer().is_open()) { + state = bind; + BOOST_ASSERT(!strm.writing_); + strm.writing_ = true; + queue_work_guard.emplace(strm.queue_->get_executor()); + auto& a_strm{strm}; + auto cbs = packet->const_buffer_sequence(); + async_write( + a_strm.nl_, + cbs, + as::bind_executor( + a_strm.strand_, + force_move(self) + ) + ); + } + else { + state = bind; + auto& a_strm{strm}; + as::dispatch( a_strm.strand_, - force_move(self) - ) - ); + as::append( + force_move(self), + errc::make_error_code(errc::connection_reset), + 0 + ) + ); + } } break; default: BOOST_ASSERT(false); @@ -620,11 +635,32 @@ class stream : public std::enable_shared_from_this> { } else if constexpr(is_tls::value) { auto& a_strm{strm}; + ASYNC_MQTT_LOG("mqtt_impl", info) + << ASYNC_MQTT_ADD_VALUE(address, this) + << "TLS async_shutdown start with timeout"; + auto tim = std::make_shared(a_strm.strand_, shutdown_timeout); + tim->async_wait( + as::bind_executor( + a_strm.strand_, + [this, &next_layer = stream.get().next_layer()] (error_code const& ec) { + if (!ec) { + ASYNC_MQTT_LOG("mqtt_impl", info) + << ASYNC_MQTT_ADD_VALUE(address, this) + << "TLS async_shutdown timeout"; + error_code ec; + next_layer.close(ec); + } + } + ) + ); stream.get().async_shutdown( as::bind_executor( a_strm.strand_, as::append( - force_move(self), + as::consign( + force_move(self), + tim + ), std::ref(stream.get().next_layer()) ) ) @@ -633,7 +669,17 @@ class stream : public std::enable_shared_from_this> { else { state = complete; error_code ec; - stream.get().close(ec); + if (stream.get().is_open()) { + ASYNC_MQTT_LOG("mqtt_impl", info) + << ASYNC_MQTT_ADD_VALUE(address, this) + << "TCP close"; + stream.get().close(ec); + } + else { + ASYNC_MQTT_LOG("mqtt_impl", info) + << ASYNC_MQTT_ADD_VALUE(address, this) + << "TCP already closed"; + } auto exe = as::get_associated_executor(self); if constexpr (is_strand>()) { state = complete;