Skip to content

Commit

Permalink
Added workaround for async_shutdown().
Browse files Browse the repository at this point in the history
When counterpart socket closed immediately, TLS async_shutdown()
CompletionToken never called. It very rarely happened.
This fix added timeout for that.
If timer is fired, then TCP layer closed forcibly.
  • Loading branch information
redboltz committed Dec 5, 2023
1 parent 1d00a9d commit 656dbb2
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 15 deletions.
6 changes: 6 additions & 0 deletions include/async_mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2195,13 +2195,19 @@ class basic_endpoint {
) {
switch (state) {
case initiate: {
ASYNC_MQTT_LOG("mqtt_impl", trace)
<< ASYNC_MQTT_ADD_VALUE(address, &ep)
<< "close initiate status:" << static_cast<int>(ep.status_) << std::endl;
state = complete;
ep.status_ = connection_status::closing;
auto& a_ep{ep};
a_ep.stream_->close(force_move(self));
} break;
case complete:
BOOST_ASSERT(ep.strand().running_in_this_thread());
ASYNC_MQTT_LOG("mqtt_impl", trace)
<< ASYNC_MQTT_ADD_VALUE(address, &ep)
<< "close complete status:" << static_cast<int>(ep.status_) << std::endl;
ep.tim_pingreq_send_->cancel();
ep.tim_pingreq_recv_->cancel();
ep.tim_pingresp_recv_->cancel();
Expand Down
76 changes: 61 additions & 15 deletions include/async_mqtt/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <async_mqtt/util/optional.hpp>
#include <async_mqtt/util/static_vector.hpp>
#include <async_mqtt/buffer.hpp>
#include <async_mqtt/constant.hpp>
#include <async_mqtt/is_strand.hpp>
#include <async_mqtt/ws_fixed_size_async_read.hpp>
#include <async_mqtt/exception.hpp>
Expand Down Expand Up @@ -405,20 +406,34 @@ class stream : public std::enable_shared_from_this<stream<NextLayer>> {
} break;
case write: {
BOOST_ASSERT(strm.strand_.running_in_this_thread());
state = bind;
BOOST_ASSERT(!strm.writing_);
strm.writing_ = true;
queue_work_guard.emplace(strm.queue_->get_executor());
auto& a_strm{strm};
auto cbs = packet->const_buffer_sequence();
async_write(
a_strm.nl_,
cbs,
as::bind_executor(
if (strm.lowest_layer().is_open()) {
state = bind;
BOOST_ASSERT(!strm.writing_);
strm.writing_ = true;
queue_work_guard.emplace(strm.queue_->get_executor());
auto& a_strm{strm};
auto cbs = packet->const_buffer_sequence();
async_write(
a_strm.nl_,
cbs,
as::bind_executor(
a_strm.strand_,
force_move(self)
)
);
}
else {
state = bind;
auto& a_strm{strm};
as::dispatch(
a_strm.strand_,
force_move(self)
)
);
as::append(
force_move(self),
errc::make_error_code(errc::connection_reset),
0
)
);
}
} break;
default:
BOOST_ASSERT(false);
Expand Down Expand Up @@ -620,11 +635,32 @@ class stream : public std::enable_shared_from_this<stream<NextLayer>> {
}
else if constexpr(is_tls<Stream>::value) {
auto& a_strm{strm};
ASYNC_MQTT_LOG("mqtt_impl", info)
<< ASYNC_MQTT_ADD_VALUE(address, this)
<< "TLS async_shutdown start with timeout";
auto tim = std::make_shared<as::steady_timer>(a_strm.strand_, shutdown_timeout);
tim->async_wait(
as::bind_executor(
a_strm.strand_,
[this, &next_layer = stream.get().next_layer()] (error_code const& ec) {
if (!ec) {
ASYNC_MQTT_LOG("mqtt_impl", info)
<< ASYNC_MQTT_ADD_VALUE(address, this)
<< "TLS async_shutdown timeout";
error_code ec;
next_layer.close(ec);
}
}
)
);
stream.get().async_shutdown(
as::bind_executor(
a_strm.strand_,
as::append(
force_move(self),
as::consign(
force_move(self),
tim
),
std::ref(stream.get().next_layer())
)
)
Expand All @@ -633,7 +669,17 @@ class stream : public std::enable_shared_from_this<stream<NextLayer>> {
else {
state = complete;
error_code ec;
stream.get().close(ec);
if (stream.get().is_open()) {
ASYNC_MQTT_LOG("mqtt_impl", info)
<< ASYNC_MQTT_ADD_VALUE(address, this)
<< "TCP close";
stream.get().close(ec);
}
else {
ASYNC_MQTT_LOG("mqtt_impl", info)
<< ASYNC_MQTT_ADD_VALUE(address, this)
<< "TCP already closed";
}
auto exe = as::get_associated_executor(self);
if constexpr (is_strand<std::decay_t<decltype(exe)>>()) {
state = complete;
Expand Down

0 comments on commit 656dbb2

Please sign in to comment.