Skip to content

Commit

Permalink
Merge pull request #185 from redboltz/add_timer_cancel_without_data_code
Browse files Browse the repository at this point in the history
Added client's infinity timer cancelling without data arrival support.
  • Loading branch information
redboltz authored May 5, 2024
2 parents 52f1db6 + 332f758 commit e96a832
Showing 1 changed file with 58 additions and 20 deletions.
78 changes: 58 additions & 20 deletions include/async_mqtt/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class basic_client {
[this, completion_handler = force_move(completion_handler)]
(auto const& se) mutable {
if (se) {
force_move(completion_handler)(se.code(), nullopt);
force_move(completion_handler)(se.code(), optional<connack_packet>{});
return;
}
auto tim = std::make_shared<as::steady_timer>(ep_->strand());
Expand All @@ -148,7 +148,13 @@ class basic_client {
(error_code const& /*ec*/) mutable {
auto& idx = pid_tim_pv_res_col_.template get<tag_tim>();
auto it =idx.find(tim);
if (it != idx.end()) {
if (it == idx.end()) {
force_move(completion_handler)(
errc::make_error_code(sys::errc::operation_canceled),
optional<connack_packet>{}
);
}
else {
auto pv = it->pv;
idx.erase(it);
if (auto *p = pv->template get_if<connack_packet>()) {
Expand All @@ -157,7 +163,7 @@ class basic_client {
else {
force_move(completion_handler)(
errc::make_error_code(sys::errc::protocol_error),
nullopt
optional<connack_packet>{}
);
}
}
Expand Down Expand Up @@ -196,7 +202,7 @@ class basic_client {
[this, pid, completion_handler = force_move(completion_handler)]
(auto const& se) mutable {
if (se) {
force_move(completion_handler)(se.code(), nullopt);
force_move(completion_handler)(se.code(), optional<suback_packet>{});
return;
}
auto tim = std::make_shared<as::steady_timer>(ep_->strand());
Expand All @@ -207,7 +213,13 @@ class basic_client {
(error_code const& /*ec*/) mutable {
auto& idx = pid_tim_pv_res_col_.template get<tag_tim>();
auto it = idx.find(tim);
if (it != idx.end()) {
if (it == idx.end()) {
force_move(completion_handler)(
errc::make_error_code(sys::errc::operation_canceled),
optional<suback_packet>{}
);
}
else {
auto pv = it->pv;
idx.erase(it);
if (auto *p = pv->template get_if<suback_packet>()) {
Expand All @@ -216,7 +228,7 @@ class basic_client {
else {
force_move(completion_handler)(
errc::make_error_code(sys::errc::protocol_error),
nullopt
optional<suback_packet>{}
);
}
}
Expand Down Expand Up @@ -255,7 +267,7 @@ class basic_client {
[this, pid, completion_handler = force_move(completion_handler)]
(auto const& se) mutable {
if (se) {
force_move(completion_handler)(se.code(), nullopt);
force_move(completion_handler)(se.code(), optional<unsuback_packet>{});
return;
}
auto tim = std::make_shared<as::steady_timer>(ep_->strand());
Expand All @@ -266,7 +278,13 @@ class basic_client {
(error_code const& /*ec*/) mutable {
auto& idx = pid_tim_pv_res_col_.template get<tag_tim>();
auto it = idx.find(tim);
if (it != idx.end()) {
if (it == idx.end()) {
force_move(completion_handler)(
errc::make_error_code(sys::errc::operation_canceled),
optional<unsuback_packet>{}
);
}
else {
auto pv = it->pv;
idx.erase(it);
if (auto *p = pv->template get_if<unsuback_packet>()) {
Expand All @@ -275,7 +293,7 @@ class basic_client {
else {
force_move(completion_handler)(
errc::make_error_code(sys::errc::protocol_error),
nullopt
optional<unsuback_packet>{}
);
}
}
Expand Down Expand Up @@ -333,7 +351,13 @@ class basic_client {
(error_code const& /*ec*/) mutable {
auto& idx = pid_tim_pv_res_col_.template get<tag_tim>();
auto it = idx.find(tim);
if (it != idx.end()) {
if (it == idx.end()) {
force_move(completion_handler)(
errc::make_error_code(sys::errc::operation_canceled),
pubres_t{}
);
}
else {
auto res = it->res;
idx.erase(it);
force_move(completion_handler)(error_code{}, res);
Expand Down Expand Up @@ -415,22 +439,32 @@ class basic_client {
() mutable {
auto call_completion_handler =
[this, completion_handler = force_move(completion_handler)]
() mutable {
auto [ec, publish_opt, disconnect_opt] = recv_queue_.front();
recv_queue_.pop_front();
force_move(completion_handler)(
ec,
force_move(publish_opt),
force_move(disconnect_opt)
);
(bool get_queue = true) mutable {
if (get_queue) {
auto [ec, publish_opt, disconnect_opt] = recv_queue_.front();
recv_queue_.pop_front();
force_move(completion_handler)(
ec,
force_move(publish_opt),
force_move(disconnect_opt)
);
}
else {
force_move(completion_handler)(
errc::make_error_code(sys::errc::operation_canceled),
optional<publish_packet>{},
optional<disconnect_packet>{}
);
}
};
if (recv_queue_.empty()) {
recv_queue_inserted_ = false;
auto tim = std::make_shared<as::steady_timer>(ep_->strand());
tim_notify_publish_recv_.expires_at(std::chrono::steady_clock::time_point::max());
tim_notify_publish_recv_.async_wait(
[call_completion_handler = force_move(call_completion_handler)]
[this, call_completion_handler = force_move(call_completion_handler)]
(error_code const& /*ec*/) mutable {
call_completion_handler();
call_completion_handler(recv_queue_inserted_);
}
);
}
Expand Down Expand Up @@ -666,6 +700,7 @@ class basic_client {
},
[&](publish_packet& p) {
recv_queue_.emplace_back(force_move(p));
recv_queue_inserted_ = true;
tim_notify_publish_recv_.cancel();
recv_loop();
},
Expand Down Expand Up @@ -702,11 +737,13 @@ class basic_client {
},
[&](disconnect_packet& p) {
recv_queue_.emplace_back(force_move(p));
recv_queue_inserted_ = true;
tim_notify_publish_recv_.cancel();
recv_loop();
},
[&](system_error const& se) {
recv_queue_.emplace_back(se.code());
recv_queue_inserted_ = true;
tim_notify_publish_recv_.cancel();
},
[&](auto const&) {
Expand Down Expand Up @@ -776,6 +813,7 @@ class basic_client {
ep_type_sp ep_;
mi_pid_tim_pv_res pid_tim_pv_res_col_;
std::deque<recv_t> recv_queue_;
bool recv_queue_inserted_ = false;
as::steady_timer tim_notify_publish_recv_{ep_->strand()};
};

Expand Down

0 comments on commit e96a832

Please sign in to comment.