diff --git a/include/async_mqtt/client.hpp b/include/async_mqtt/client.hpp index b280b44ed..2ff2ac075 100644 --- a/include/async_mqtt/client.hpp +++ b/include/async_mqtt/client.hpp @@ -136,7 +136,7 @@ class basic_client { [this, completion_handler = force_move(completion_handler)] (auto const& se) mutable { if (se) { - force_move(completion_handler)(se.code(), nullopt); + force_move(completion_handler)(se.code(), optional{}); return; } auto tim = std::make_shared(ep_->strand()); @@ -148,7 +148,13 @@ class basic_client { (error_code const& /*ec*/) mutable { auto& idx = pid_tim_pv_res_col_.template get(); auto it =idx.find(tim); - if (it != idx.end()) { + if (it == idx.end()) { + force_move(completion_handler)( + errc::make_error_code(sys::errc::operation_canceled), + optional{} + ); + } + else { auto pv = it->pv; idx.erase(it); if (auto *p = pv->template get_if()) { @@ -157,7 +163,7 @@ class basic_client { else { force_move(completion_handler)( errc::make_error_code(sys::errc::protocol_error), - nullopt + optional{} ); } } @@ -196,7 +202,7 @@ class basic_client { [this, pid, completion_handler = force_move(completion_handler)] (auto const& se) mutable { if (se) { - force_move(completion_handler)(se.code(), nullopt); + force_move(completion_handler)(se.code(), optional{}); return; } auto tim = std::make_shared(ep_->strand()); @@ -207,7 +213,13 @@ class basic_client { (error_code const& /*ec*/) mutable { auto& idx = pid_tim_pv_res_col_.template get(); auto it = idx.find(tim); - if (it != idx.end()) { + if (it == idx.end()) { + force_move(completion_handler)( + errc::make_error_code(sys::errc::operation_canceled), + optional{} + ); + } + else { auto pv = it->pv; idx.erase(it); if (auto *p = pv->template get_if()) { @@ -216,7 +228,7 @@ class basic_client { else { force_move(completion_handler)( errc::make_error_code(sys::errc::protocol_error), - nullopt + optional{} ); } } @@ -255,7 +267,7 @@ class basic_client { [this, pid, completion_handler = force_move(completion_handler)] (auto const& se) mutable { if (se) { - force_move(completion_handler)(se.code(), nullopt); + force_move(completion_handler)(se.code(), optional{}); return; } auto tim = std::make_shared(ep_->strand()); @@ -266,7 +278,13 @@ class basic_client { (error_code const& /*ec*/) mutable { auto& idx = pid_tim_pv_res_col_.template get(); auto it = idx.find(tim); - if (it != idx.end()) { + if (it == idx.end()) { + force_move(completion_handler)( + errc::make_error_code(sys::errc::operation_canceled), + optional{} + ); + } + else { auto pv = it->pv; idx.erase(it); if (auto *p = pv->template get_if()) { @@ -275,7 +293,7 @@ class basic_client { else { force_move(completion_handler)( errc::make_error_code(sys::errc::protocol_error), - nullopt + optional{} ); } } @@ -333,7 +351,13 @@ class basic_client { (error_code const& /*ec*/) mutable { auto& idx = pid_tim_pv_res_col_.template get(); auto it = idx.find(tim); - if (it != idx.end()) { + if (it == idx.end()) { + force_move(completion_handler)( + errc::make_error_code(sys::errc::operation_canceled), + pubres_t{} + ); + } + else { auto res = it->res; idx.erase(it); force_move(completion_handler)(error_code{}, res); @@ -415,22 +439,32 @@ class basic_client { () mutable { auto call_completion_handler = [this, completion_handler = force_move(completion_handler)] - () mutable { - auto [ec, publish_opt, disconnect_opt] = recv_queue_.front(); - recv_queue_.pop_front(); - force_move(completion_handler)( - ec, - force_move(publish_opt), - force_move(disconnect_opt) - ); + (bool get_queue = true) mutable { + if (get_queue) { + auto [ec, publish_opt, disconnect_opt] = recv_queue_.front(); + recv_queue_.pop_front(); + force_move(completion_handler)( + ec, + force_move(publish_opt), + force_move(disconnect_opt) + ); + } + else { + force_move(completion_handler)( + errc::make_error_code(sys::errc::operation_canceled), + optional{}, + optional{} + ); + } }; if (recv_queue_.empty()) { + recv_queue_inserted_ = false; auto tim = std::make_shared(ep_->strand()); tim_notify_publish_recv_.expires_at(std::chrono::steady_clock::time_point::max()); tim_notify_publish_recv_.async_wait( - [call_completion_handler = force_move(call_completion_handler)] + [this, call_completion_handler = force_move(call_completion_handler)] (error_code const& /*ec*/) mutable { - call_completion_handler(); + call_completion_handler(recv_queue_inserted_); } ); } @@ -666,6 +700,7 @@ class basic_client { }, [&](publish_packet& p) { recv_queue_.emplace_back(force_move(p)); + recv_queue_inserted_ = true; tim_notify_publish_recv_.cancel(); recv_loop(); }, @@ -702,11 +737,13 @@ class basic_client { }, [&](disconnect_packet& p) { recv_queue_.emplace_back(force_move(p)); + recv_queue_inserted_ = true; tim_notify_publish_recv_.cancel(); recv_loop(); }, [&](system_error const& se) { recv_queue_.emplace_back(se.code()); + recv_queue_inserted_ = true; tim_notify_publish_recv_.cancel(); }, [&](auto const&) { @@ -776,6 +813,7 @@ class basic_client { ep_type_sp ep_; mi_pid_tim_pv_res pid_tim_pv_res_col_; std::deque recv_queue_; + bool recv_queue_inserted_ = false; as::steady_timer tim_notify_publish_recv_{ep_->strand()}; };