Skip to content

Commit

Permalink
Refined close process.
Browse files Browse the repository at this point in the history
User triggered close could happen during internal close processing.
This PR fix the problem in this case.
  • Loading branch information
redboltz committed Dec 11, 2023
1 parent cddb5aa commit a03cfb7
Showing 1 changed file with 39 additions and 10 deletions.
49 changes: 39 additions & 10 deletions include/async_mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2194,25 +2194,52 @@ class basic_endpoint {
error_code const& = error_code{}
) {
switch (state) {
case initiate: {
ASYNC_MQTT_LOG("mqtt_impl", trace)
<< ASYNC_MQTT_ADD_VALUE(address, &ep)
<< "close initiate status:" << static_cast<int>(ep.status_);
state = complete;
ep.status_ = connection_status::closing;
auto& a_ep{ep};
a_ep.stream_->close(force_move(self));
} break;
case initiate:
switch (ep.status_) {
case connection_status::connecting:
case connection_status::connected:
case connection_status::disconnecting: {
ASYNC_MQTT_LOG("mqtt_impl", trace)
<< ASYNC_MQTT_ADD_VALUE(address, &ep)
<< "close initiate status:" << static_cast<int>(ep.status_);
state = complete;
ep.status_ = connection_status::closing;
auto& a_ep{ep};
a_ep.stream_->close(force_move(self));
} break;
case connection_status::closing: {
ASYNC_MQTT_LOG("mqtt_impl", trace)
<< ASYNC_MQTT_ADD_VALUE(address, &ep)
<< "already close requested";
state = complete;
auto& a_ep{ep};
a_ep.close_queue_.post(force_move(self));
} break;
case connection_status::closed:
ASYNC_MQTT_LOG("mqtt_impl", trace)
<< ASYNC_MQTT_ADD_VALUE(address, &ep)
<< "already closed";
state = complete;
self.complete();
break;
}
break;
case complete:
BOOST_ASSERT(ep.strand().running_in_this_thread());
ASYNC_MQTT_LOG("mqtt_impl", trace)
<< ASYNC_MQTT_ADD_VALUE(address, &ep)
<< "close complete status:" << static_cast<int>(ep.status_);
<< "close complete status:" << static_cast<int>(ep.status_);
ep.tim_pingreq_send_->cancel();
ep.tim_pingreq_recv_->cancel();
ep.tim_pingresp_recv_->cancel();
ep.status_ = connection_status::closed;
self.complete();
while (!ep.close_queue_.stopped()) {
ASYNC_MQTT_LOG("mqtt_impl", trace)
<< ASYNC_MQTT_ADD_VALUE(address, &ep)
<< "process enqueued close";
ep.close_queue_.poll_one();
}
break;
}
}
Expand Down Expand Up @@ -2573,6 +2600,8 @@ class basic_endpoint {
std::set<packet_id_t> publish_recv_;
std::deque<v5::basic_publish_packet<PacketIdBytes>> publish_queue_;

ioc_queue close_queue_;

std::uint32_t maximum_packet_size_send_{packet_size_no_limit};
std::uint32_t maximum_packet_size_recv_{packet_size_no_limit};

Expand Down

0 comments on commit a03cfb7

Please sign in to comment.