Skip to content

Commit

Permalink
Fixed TLS async_close timeout logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
redboltz committed Oct 8, 2024
1 parent 5b029eb commit 7b9b2a3
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 44 deletions.
2 changes: 1 addition & 1 deletion include/async_mqtt/impl/endpoint_close.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ close_op {
ASYNC_MQTT_LOG("mqtt_impl", trace)
<< ASYNC_MQTT_ADD_VALUE(address, &a_ep)
<< "already close requested";
a_ep.close_queue_.post(
a_ep.close_queue_.post(
force_move(self)
);
} break;
Expand Down
67 changes: 24 additions & 43 deletions include/async_mqtt/predefined_layer/customized_ssl_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,60 +68,41 @@ struct layer_customize<as::ssl::stream<NextLayer>> {
stream.get_executor(),
shutdown_timeout
);
auto self_sp = std::make_shared<Self>(force_move(self));
auto sig = std::make_shared<as::cancellation_signal>();
tim->async_wait(
as::consign(
as::append(
std::ref(*self_sp),
std::weak_ptr<as::steady_timer>(tim)
),
self_sp
)
[sig, wp = std::weak_ptr<as::steady_timer>(tim)]
(error_code const& ec) {
if (!ec) {
if (auto sp = wp.lock()) {
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "TLS async_shutdown timeout";
sig->emit(as::cancellation_type::terminal);
}
}
}
);
stream.async_shutdown(
as::consign(
std::ref(*self_sp),
self_sp,
tim
auto& a_stream{stream};
a_stream.async_shutdown(
as::bind_cancellation_slot(
sig->slot(),
as::consign(
force_move(self),
tim,
sig
)
)
);
}

template <typename Self>
void operator()(
Self& self,
error_code const& ec,
std::weak_ptr<as::steady_timer> wp
) {
if (!ec) {
if (auto sp = wp.lock()) {
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "TLS async_shutdown timeout";
BOOST_ASSERT(state == shutdown);
state = complete;
self.complete(ec);
return;
}
}
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "TLS async_shutdown timeout doesn't processed. ec:" << ec.message();
}

template <typename Self>
void operator()(
Self& self,
error_code const& ec
) {
if (state == complete) {
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "TLS async_shutdown already timeout";
}
else {
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "TLS async_shutdown ec:" << ec.message();
state = complete;
self.complete(ec);
}
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "TLS async_shutdown ec:" << ec.message();
state = complete;
self.complete(ec);
}
};
};
Expand Down
1 change: 1 addition & 0 deletions include/async_mqtt/util/impl/stream_read_packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct stream_impl<NextLayer>::stream_read_packet_op {
a_strm.read_queue_.post(
force_move(self)
);
a_strm.read_queue_.try_execute();
} break;
case work: {
a_strm.read_queue_.start_work();
Expand Down
1 change: 1 addition & 0 deletions include/async_mqtt/util/impl/stream_write_packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct stream_impl<NextLayer>::stream_write_packet_op {
a_strm.write_queue_.post(
force_move(self)
);
a_strm.write_queue_.try_execute();
} break;
case write: {
a_strm.write_queue_.start_work();
Expand Down
3 changes: 3 additions & 0 deletions include/async_mqtt/util/ioc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class ioc_queue {
queue_,
std::forward<CompletionToken>(token)
);
}

void try_execute() {
if (immediate_executable()) {
queue_.restart();
queue_.poll_one();
Expand Down

0 comments on commit 7b9b2a3

Please sign in to comment.