diff --git a/include/async_mqtt/endpoint.hpp b/include/async_mqtt/endpoint.hpp index e9383bcd8..dba6bc4ec 100644 --- a/include/async_mqtt/endpoint.hpp +++ b/include/async_mqtt/endpoint.hpp @@ -2194,24 +2194,51 @@ class basic_endpoint { error_code const& = error_code{} ) { switch (state) { - case initiate: { - ASYNC_MQTT_LOG("mqtt_impl", trace) - << ASYNC_MQTT_ADD_VALUE(address, &ep) - << "close initiate status:" << static_cast(ep.status_); - state = complete; - ep.status_ = connection_status::closing; - auto& a_ep{ep}; - a_ep.stream_->close(force_move(self)); - } break; + case initiate: + switch (ep.status_) { + case connection_status::connecting: + case connection_status::connected: + case connection_status::disconnecting: { + ASYNC_MQTT_LOG("mqtt_impl", trace) + << ASYNC_MQTT_ADD_VALUE(address, &ep) + << "close initiate status:" << static_cast(ep.status_); + state = complete; + ep.status_ = connection_status::closing; + auto& a_ep{ep}; + a_ep.stream_->close(force_move(self)); + } break; + case connection_status::closing: { + ASYNC_MQTT_LOG("mqtt_impl", trace) + << ASYNC_MQTT_ADD_VALUE(address, &ep) + << "already close requested"; + state = complete; + auto& a_ep{ep}; + a_ep.close_queue_.post(force_move(self)); + } break; + case connection_status::closed: + ASYNC_MQTT_LOG("mqtt_impl", trace) + << ASYNC_MQTT_ADD_VALUE(address, &ep) + << "already closed"; + state = complete; + self.complete(); + break; + } + break; case complete: BOOST_ASSERT(ep.strand().running_in_this_thread()); ASYNC_MQTT_LOG("mqtt_impl", trace) << ASYNC_MQTT_ADD_VALUE(address, &ep) - << "close complete status:" << static_cast(ep.status_); + << "close complete status:" << static_cast(ep.status_); ep.tim_pingreq_send_->cancel(); ep.tim_pingreq_recv_->cancel(); ep.tim_pingresp_recv_->cancel(); ep.status_ = connection_status::closed; + while (!ep.close_queue_.stopped()) { + ASYNC_MQTT_LOG("mqtt_impl", trace) + << ASYNC_MQTT_ADD_VALUE(address, &ep) + << "process enqueued close"; + ep.close_queue_.poll_one(); + } self.complete(); break; } @@ -2573,6 +2600,8 @@ class basic_endpoint { std::set publish_recv_; std::deque> publish_queue_; + ioc_queue close_queue_; + std::uint32_t maximum_packet_size_send_{packet_size_no_limit}; std::uint32_t maximum_packet_size_recv_{packet_size_no_limit}; diff --git a/include/async_mqtt/stream.hpp b/include/async_mqtt/stream.hpp index 00148a829..8ca934055 100644 --- a/include/async_mqtt/stream.hpp +++ b/include/async_mqtt/stream.hpp @@ -514,6 +514,7 @@ class stream : public std::enable_shared_from_this> { complete } state = dispatch; error_code last_ec = error_code{}; + this_type_sp life_keeper = strm.shared_from_this(); template void operator()(