diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc
index 3c0658c60..bb7d7ae62 100644
--- a/CHANGELOG.adoc
+++ b/CHANGELOG.adoc
@@ -3,6 +3,7 @@
= History
== 9.0.2
+* Fixed TLS timeout logic. #357
* Fixed broker auth file for docker. #356
== 9.0.1
diff --git a/doc/CHANGELOG.html b/doc/CHANGELOG.html
index f1fda449b..060192746 100644
--- a/doc/CHANGELOG.html
+++ b/doc/CHANGELOG.html
@@ -69,6 +69,9 @@
9.0.2
diff --git a/doc/api/customized__ssl__stream_8hpp_source.html b/doc/api/customized__ssl__stream_8hpp_source.html
index c56aad4e2..ce3940057 100644
--- a/doc/api/customized__ssl__stream_8hpp_source.html
+++ b/doc/api/customized__ssl__stream_8hpp_source.html
@@ -162,68 +162,49 @@
68 stream.get_executor(),
-
71 auto self_sp = std::make_shared<Self>(force_move(self));
+
71 auto sig = std::make_shared<as::cancellation_signal>();
-
-
-
-
76 std::weak_ptr<as::steady_timer>(tim)
-
-
-
-
-
81 stream.async_shutdown(
-
-
-
-
-
-
-
-
-
90 template <
typename Self>
-
-
-
-
94 std::weak_ptr<as::steady_timer> wp
-
-
-
97 if (
auto sp = wp.lock()) {
-
98 ASYNC_MQTT_LOG(
"mqtt_impl",
info)
-
99 <<
"TLS async_shutdown timeout";
-
100 BOOST_ASSERT(state == shutdown);
-
-
-
-
-
-
106 ASYNC_MQTT_LOG(
"mqtt_impl",
info)
-
107 <<
"TLS async_shutdown timeout doesn't processed. ec:" << ec.message();
-
-
-
110 template <
typename Self>
-
-
-
-
-
115 if (state == complete) {
-
116 ASYNC_MQTT_LOG(
"mqtt_impl",
info)
-
117 <<
"TLS async_shutdown already timeout";
-
-
-
120 ASYNC_MQTT_LOG(
"mqtt_impl",
info)
-
121 <<
"TLS async_shutdown ec:" << ec.message();
-
-
-
-
-
-
+
73 [sig, wp = std::weak_ptr<as::steady_timer>(tim)]
+
+
+
76 if (
auto sp = wp.lock()) {
+
77 ASYNC_MQTT_LOG(
"mqtt_impl",
info)
+
78 <<
"TLS async_shutdown timeout";
+
79 sig->emit(as::cancellation_type::terminal);
+
+
+
+
+
84 auto& a_stream{stream};
+
85 a_stream.async_shutdown(
+
86 as::bind_cancellation_slot(
+
+
+
+
+
+
+
+
+
+
+
97 template <
typename Self>
+
+
+
+
+
102 ASYNC_MQTT_LOG(
"mqtt_impl",
info)
+
103 <<
"TLS async_shutdown ec:" << ec.message();
+
+
+
+
+
-
-
-
-
+
+
+
+
sys::error_code error_code
sys is a namespace alias of boost::sytem.
Definition error.hpp:56
@ info
info level api call is output
customization class template for underlying layer In order to adapt your layer to async_mqtt,...
Definition stream_traits.hpp:101
diff --git a/doc/api/ioc__queue_8hpp_source.html b/doc/api/ioc__queue_8hpp_source.html
index f57f4816c..0f9dbea65 100644
--- a/doc/api/ioc__queue_8hpp_source.html
+++ b/doc/api/ioc__queue_8hpp_source.html
@@ -142,37 +142,40 @@
41 std::forward<CompletionToken>(token)
- 43 if (immediate_executable()) {
-
-
-
-
-
- 49 bool stopped()
const {
- 50 return queue_.stopped();
-
-
- 53 std::size_t poll_one() {
-
- 55 if (queue_.stopped()) queue_.restart();
- 56 return queue_.poll_one();
-
-
-
-
- 61 if (queue_.stopped()) queue_.restart();
-
-
-
-
- 66 as::io_context queue_{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE};
- 67 bool working_ =
false;
- 68 std::optional<as::executor_work_guard<as::io_context::executor_type>> guard_;
-
-
-
-
-
+
+
+
+ 46 if (immediate_executable()) {
+
+
+
+
+
+ 52 bool stopped()
const {
+ 53 return queue_.stopped();
+
+
+ 56 std::size_t poll_one() {
+
+ 58 if (queue_.stopped()) queue_.restart();
+ 59 return queue_.poll_one();
+
+
+
+
+ 64 if (queue_.stopped()) queue_.restart();
+
+
+
+
+ 69 as::io_context queue_{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE};
+ 70 bool working_ =
false;
+ 71 std::optional<as::executor_work_guard<as::io_context::executor_type>> guard_;
+
+
+
+
+
diff --git a/include/async_mqtt/impl/endpoint_close.hpp b/include/async_mqtt/impl/endpoint_close.hpp
index 24bdea909..0b79e42f4 100644
--- a/include/async_mqtt/impl/endpoint_close.hpp
+++ b/include/async_mqtt/impl/endpoint_close.hpp
@@ -51,7 +51,7 @@ close_op {
ASYNC_MQTT_LOG("mqtt_impl", trace)
<< ASYNC_MQTT_ADD_VALUE(address, &a_ep)
<< "already close requested";
- a_ep.close_queue_.post(
+ a_ep.close_queue_.post(
force_move(self)
);
} break;
diff --git a/include/async_mqtt/predefined_layer/customized_ssl_stream.hpp b/include/async_mqtt/predefined_layer/customized_ssl_stream.hpp
index 3a98c479f..0be45bd8a 100644
--- a/include/async_mqtt/predefined_layer/customized_ssl_stream.hpp
+++ b/include/async_mqtt/predefined_layer/customized_ssl_stream.hpp
@@ -68,60 +68,41 @@ struct layer_customize> {
stream.get_executor(),
shutdown_timeout
);
- auto self_sp = std::make_shared(force_move(self));
+ auto sig = std::make_shared();
tim->async_wait(
- as::consign(
- as::append(
- std::ref(*self_sp),
- std::weak_ptr(tim)
- ),
- self_sp
- )
+ [sig, wp = std::weak_ptr(tim)]
+ (error_code const& ec) {
+ if (!ec) {
+ if (auto sp = wp.lock()) {
+ ASYNC_MQTT_LOG("mqtt_impl", info)
+ << "TLS async_shutdown timeout";
+ sig->emit(as::cancellation_type::terminal);
+ }
+ }
+ }
);
- stream.async_shutdown(
- as::consign(
- std::ref(*self_sp),
- self_sp,
- tim
+ auto& a_stream{stream};
+ a_stream.async_shutdown(
+ as::bind_cancellation_slot(
+ sig->slot(),
+ as::consign(
+ force_move(self),
+ tim,
+ sig
+ )
)
);
}
- template
- void operator()(
- Self& self,
- error_code const& ec,
- std::weak_ptr wp
- ) {
- if (!ec) {
- if (auto sp = wp.lock()) {
- ASYNC_MQTT_LOG("mqtt_impl", info)
- << "TLS async_shutdown timeout";
- BOOST_ASSERT(state == shutdown);
- state = complete;
- self.complete(ec);
- return;
- }
- }
- ASYNC_MQTT_LOG("mqtt_impl", info)
- << "TLS async_shutdown timeout doesn't processed. ec:" << ec.message();
- }
-
template
void operator()(
Self& self,
error_code const& ec
) {
- if (state == complete) {
- ASYNC_MQTT_LOG("mqtt_impl", info)
- << "TLS async_shutdown already timeout";
- }
- else {
- ASYNC_MQTT_LOG("mqtt_impl", info)
- << "TLS async_shutdown ec:" << ec.message();
- state = complete;
- self.complete(ec);
- }
+ ASYNC_MQTT_LOG("mqtt_impl", info)
+ << "TLS async_shutdown ec:" << ec.message();
+ state = complete;
+ self.complete(ec);
}
};
};
diff --git a/include/async_mqtt/util/impl/stream_read_packet.hpp b/include/async_mqtt/util/impl/stream_read_packet.hpp
index 865cff310..7950e63a9 100644
--- a/include/async_mqtt/util/impl/stream_read_packet.hpp
+++ b/include/async_mqtt/util/impl/stream_read_packet.hpp
@@ -47,6 +47,7 @@ struct stream_impl::stream_read_packet_op {
a_strm.read_queue_.post(
force_move(self)
);
+ a_strm.read_queue_.try_execute();
} break;
case work: {
a_strm.read_queue_.start_work();
diff --git a/include/async_mqtt/util/impl/stream_write_packet.hpp b/include/async_mqtt/util/impl/stream_write_packet.hpp
index 66fe29d07..70f0ed207 100644
--- a/include/async_mqtt/util/impl/stream_write_packet.hpp
+++ b/include/async_mqtt/util/impl/stream_write_packet.hpp
@@ -51,6 +51,7 @@ struct stream_impl::stream_write_packet_op {
a_strm.write_queue_.post(
force_move(self)
);
+ a_strm.write_queue_.try_execute();
} break;
case write: {
a_strm.write_queue_.start_work();
diff --git a/include/async_mqtt/util/ioc_queue.hpp b/include/async_mqtt/util/ioc_queue.hpp
index 0684eb729..5632fbbe9 100644
--- a/include/async_mqtt/util/ioc_queue.hpp
+++ b/include/async_mqtt/util/ioc_queue.hpp
@@ -40,6 +40,9 @@ class ioc_queue {
queue_,
std::forward(token)
);
+ }
+
+ void try_execute() {
if (immediate_executable()) {
queue_.restart();
queue_.poll_one();