Skip to content

Commit

Permalink
Merge pull request #369 from redboltz/remove_inflight_mei_treat
Browse files Browse the repository at this point in the history
Fixed invalid Message Expiry Interval.
  • Loading branch information
redboltz authored Oct 29, 2024
2 parents 26324f6 + dca8342 commit 8a07a7c
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
= History

== 9.1.0
* Fixed invalid Message Expiry Interval applying. #369
* Fixed invalid template parameter comparison. #368
* Added custom logger example. #367
* Refined documents. #364, #365
Expand Down
66 changes: 3 additions & 63 deletions include/async_mqtt/util/store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,46 +36,7 @@ class store {
if constexpr(is_publish<Packet>()) {
if (packet.opts().get_qos() == qos::at_least_once ||
packet.opts().get_qos() == qos::exactly_once) {
std::uint32_t sec = 0;
if constexpr(is_v5<Packet>()) {
bool finish = false;
for (auto const& prop : packet.props()) {
prop.visit(
overload {
[&](property::message_expiry_interval const& p) {
sec = p.val();
finish = true;
},
[](auto const&) {
}
}
);
if (finish) break;
}
}
if (sec == 0) {
return elems_.emplace_back(packet).second;
}
else {
auto tim = std::make_shared<as::steady_timer>(exe_);
tim->expires_after(std::chrono::seconds(sec));
tim->async_wait(
[this, wp = std::weak_ptr<as::steady_timer>(tim)]
(error_code const& ec) {
if (auto tim = wp.lock()) {
if (!ec) {
auto& idx = elems_.template get<tag_tim>();
auto it = idx.find(tim.get());
if (it == idx.end()) return;
ASYNC_MQTT_LOG("mqtt_impl", info)
<< "[store] message expired:" << it->packet;
idx.erase(it);
}
}
}
);
return elems_.emplace_back(packet, tim).second;
}
return elems_.emplace_back(packet).second;
}
}
else if constexpr(is_pubrel<Packet>()) {
Expand Down Expand Up @@ -120,14 +81,6 @@ class store {
std::vector<store_packet_type> ret;
ret.reserve(elems_.size());
for (auto elem : elems_) {
if (elem.tim) {
auto d =
std::chrono::duration_cast<std::chrono::seconds>(
elem.tim->expiry() - std::chrono::steady_clock::now()
).count();
if (d < 0) d = 0;
elem.packet.update_message_expiry_interval(static_cast<std::uint32_t>(d));
}
ret.push_back(force_move(elem.packet));
}
return ret;
Expand All @@ -136,9 +89,8 @@ class store {
private:
struct elem_t {
elem_t(
store_packet_type packet,
std::shared_ptr<as::steady_timer> tim = nullptr
): packet{force_move(packet)}, tim{force_move(tim)} {}
store_packet_type packet
): packet{force_move(packet)} {}

typename basic_packet_id_type<PacketIdBytes>::type packet_id() const {
return packet.packet_id();
Expand All @@ -148,16 +100,10 @@ class store {
return packet.response_packet_type();
}

void const* tim_address() const {
return tim.get();
}

store_packet_type packet;
std::shared_ptr<as::steady_timer> tim = nullptr;
};
struct tag_seq{};
struct tag_res_id{};
struct tag_tim{};
using mi_elem = mi::multi_index_container<
elem_t,
mi::indexed_by<
Expand All @@ -170,12 +116,6 @@ class store {
&elem_t::response_packet_type,
&elem_t::packet_id
>
>,
mi::hashed_non_unique<
mi::tag<tag_tim>,
mi::key<
&elem_t::tim_address
>
>
>
>;
Expand Down
23 changes: 21 additions & 2 deletions test/system/st_inflight.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,9 @@ BOOST_AUTO_TEST_CASE(v5_from_broker_mei) {
)
);

// wait for broker QoS1 publish message will expire
// 2 > 1 (QoS1 message_expiry_interval)
// but PUBLISH has already been triggered
// so message shouldn't be expired
std::this_thread::sleep_for(std::chrono::seconds(2));
yield ep(sub).async_close(*this);

Expand Down Expand Up @@ -1005,6 +1007,23 @@ BOOST_AUTO_TEST_CASE(v5_from_broker_mei) {
);

yield ep(sub).async_recv(*this);
// message_expiry_interval shouldn't be updated
// because PUBLISH has already been triggered
BOOST_TEST(
pv
==
(am::v5::publish_packet{
1,
"topic1",
"payload1",
am::qos::at_least_once | am::pub::dup::yes,
{am::property::message_expiry_interval{1}}
})
);

yield ep(sub).async_recv(*this);
// message_expiry_interval shouldn't be updated
// because PUBLISH has already been triggered
BOOST_TEST(
pv
==
Expand All @@ -1013,7 +1032,7 @@ BOOST_AUTO_TEST_CASE(v5_from_broker_mei) {
"topic1",
"payload2",
am::qos::exactly_once | am::pub::dup::yes,
{am::property::message_expiry_interval{9}}
{am::property::message_expiry_interval{10}}
})
);

Expand Down
18 changes: 0 additions & 18 deletions tool/include/broker/inflight_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,6 @@ class inflight_message {
template <typename Epsp>
void send(Epsp& epsp) const {
std::optional<store_packet_variant> packet_opt;
if (tim_message_expiry_) {
packet_.visit(
overload {
[&](v5::basic_publish_packet<sizeof(packet_id_type)> const& m) {
auto updated_packet = m;
auto d =
std::chrono::duration_cast<std::chrono::seconds>(
tim_message_expiry_->expiry() - std::chrono::steady_clock::now()
).count();
if (d < 0) d = 0;
updated_packet.update_message_expiry_interval(static_cast<std::uint32_t>(d));
packet_opt.emplace(force_move(updated_packet));
},
[](auto const&) {
}
}
);
}
epsp.register_packet_id(packet_id());
epsp.async_send(
packet_opt ? *packet_opt : packet_,
Expand Down

0 comments on commit 8a07a7c

Please sign in to comment.