Skip to content

Commit

Permalink
Merge pull request #136 from redboltz/fix_pubrel_serialize
Browse files Browse the repository at this point in the history
Fixed pubrel serialize.
  • Loading branch information
redboltz authored Apr 18, 2018
2 parents 4f4a8ce + a02c468 commit 965ef66
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 23 deletions.
4 changes: 1 addition & 3 deletions include/mqtt/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ class client : public endpoint<Socket> {
* When the endpoint disconnects using disconnect(), a will won't send.<BR>
* See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090<BR>
* @param timeout after timeout elapsed, force_disconnect() is automatically called.
* .
*/
void disconnect(boost::posix_time::time_duration const& timeout) {
if (ping_duration_ms_ != 0) tim_ping_.cancel();
Expand All @@ -335,8 +334,7 @@ class client : public endpoint<Socket> {
* Send a disconnect packet to the connected broker. It is a clean disconnecting sequence.
* The broker disconnects the endpoint after receives the disconnect packet.<BR>
* When the endpoint disconnects using disconnect(), a will won't send.<BR>
* See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090<BR>
* .
* See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090<BR>.
*/
void disconnect() {
if (ping_duration_ms_ != 0) tim_ping_.cancel();
Expand Down
25 changes: 21 additions & 4 deletions include/mqtt/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3099,6 +3099,9 @@ class endpoint : public std::enable_shared_from_this<endpoint<Socket, Mutex, Loc
* A topic name to publish
* @param contents
* The contents to publish
* @param life_keeper
* The function for keeping topic_name and contents life.
* It is usually a lambda expression that captures shared_ptr of topic_name and contents.
* @param retain
* A retain flag. If set it to true, the contents is retained.<BR>
* See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038<BR>
Expand Down Expand Up @@ -3924,15 +3927,20 @@ class endpoint : public std::enable_shared_from_this<endpoint<Socket, Mutex, Loc
restore_serialized_message(*sp, [sp] {});
} break;
case control_packet_type::pubrel: {
auto sp = std::make_shared<pubrel_message>(b, e);
restore_serialized_message(*sp, [sp] {});
restore_serialized_message(pubrel_message(b, e));
} break;
default:
throw protocol_error();
break;
}
}

/**
* @brief Restore serialized publish message.
* This function shouold be called before connect.
* @param msg publish message.
* @param life_keeper the function that keeps the msg lifetime.
*/
void restore_serialized_message(publish_message msg, life_keeper_t life_keeper) {
auto packet_id = msg.packet_id();
auto qos = msg.qos();
Expand All @@ -3948,19 +3956,28 @@ class endpoint : public std::enable_shared_from_this<endpoint<Socket, Mutex, Loc
}
}

void restore_serialized_message(pubrel_message msg, life_keeper_t life_keeper) {
/**
* @brief Restore serialized pubrel message.
* This function shouold be called before connect.
* @param msg pubrel message.
*/
void restore_serialized_message(pubrel_message msg) {
auto packet_id = msg.packet_id();
LockGuard<Mutex> lck (store_mtx_);
if (packet_id_.insert(packet_id).second) {
store_.emplace(
packet_id,
control_packet_type::pubcomp,
std::move(msg),
std::move(life_keeper)
[]{}
);
}
}

/**
* @brief Check connection status
* @returrn current connection status
*/
bool connected() const {
return connected_ && mqtt_connected_;
}
Expand Down
44 changes: 28 additions & 16 deletions test/resend_serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ BOOST_AUTO_TEST_CASE( publish_qos1 ) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
},
[&serialized](mqtt::pubrel_message msg) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
BOOST_CHECK(serialized.find(msg.packet_id()) != serialized.end());
serialized[msg.packet_id()] = std::make_tuple(false, msg.continuous_buffer());
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand All @@ -47,9 +49,11 @@ BOOST_AUTO_TEST_CASE( publish_qos1 ) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
},
[&serialized](mqtt::pubrel_message msg) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
BOOST_CHECK(serialized.find(msg.packet_id()) != serialized.end());
serialized[msg.packet_id()] = std::make_tuple(false, msg.continuous_buffer());
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand Down Expand Up @@ -142,8 +146,7 @@ BOOST_AUTO_TEST_CASE( publish_qos1 ) {
else {
// pubrel
c2->restore_serialized_message(
mqtt::publish_message(packet.begin(), packet.end()),
[]{}
mqtt::pubrel_message(packet.begin(), packet.end())
);
}
}
Expand Down Expand Up @@ -222,9 +225,11 @@ BOOST_AUTO_TEST_CASE( publish_qos2 ) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
},
[&serialized](mqtt::pubrel_message msg) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
BOOST_CHECK(serialized.find(msg.packet_id()) != serialized.end());
serialized[msg.packet_id()] = std::make_tuple(false, msg.continuous_buffer());
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand All @@ -234,9 +239,11 @@ BOOST_AUTO_TEST_CASE( publish_qos2 ) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
},
[&serialized](mqtt::pubrel_message msg) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
BOOST_CHECK(serialized.find(msg.packet_id()) != serialized.end());
serialized[msg.packet_id()] = std::make_tuple(false, msg.continuous_buffer());
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand Down Expand Up @@ -330,8 +337,7 @@ BOOST_AUTO_TEST_CASE( publish_qos2 ) {
else {
// pubrel
c2->restore_serialized_message(
mqtt::publish_message(packet.begin(), packet.end()),
[]{}
mqtt::pubrel_message(packet.begin(), packet.end())
);
}
}
Expand Down Expand Up @@ -417,9 +423,11 @@ BOOST_AUTO_TEST_CASE( pubrel_qos2 ) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
},
[&serialized](mqtt::pubrel_message msg) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
BOOST_CHECK(serialized.find(msg.packet_id()) != serialized.end());
serialized[msg.packet_id()] = std::make_tuple(false, msg.continuous_buffer());
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand All @@ -429,9 +437,11 @@ BOOST_AUTO_TEST_CASE( pubrel_qos2 ) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
},
[&serialized](mqtt::pubrel_message msg) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
BOOST_CHECK(serialized.find(msg.packet_id()) != serialized.end());
serialized[msg.packet_id()] = std::make_tuple(false, msg.continuous_buffer());
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand Down Expand Up @@ -524,8 +534,7 @@ BOOST_AUTO_TEST_CASE( pubrel_qos2 ) {
else {
// pubrel
c2->restore_serialized_message(
mqtt::publish_message(packet.begin(), packet.end()),
[]{}
mqtt::pubrel_message(packet.begin(), packet.end())
);
}
}
Expand Down Expand Up @@ -619,9 +628,11 @@ BOOST_AUTO_TEST_CASE( multi_publish_qos1 ) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
},
[&serialized](mqtt::pubrel_message msg) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
BOOST_CHECK(serialized.find(msg.packet_id()) != serialized.end());
serialized[msg.packet_id()] = std::make_tuple(false, msg.continuous_buffer());
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand All @@ -631,9 +642,11 @@ BOOST_AUTO_TEST_CASE( multi_publish_qos1 ) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
},
[&serialized](mqtt::pubrel_message msg) {
serialized.emplace(msg.packet_id(), std::make_tuple(true, msg.continuous_buffer()));
BOOST_CHECK(serialized.find(msg.packet_id()) != serialized.end());
serialized[msg.packet_id()] = std::make_tuple(false, msg.continuous_buffer());
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand Down Expand Up @@ -733,8 +746,7 @@ BOOST_AUTO_TEST_CASE( multi_publish_qos1 ) {
else {
// pubrel
c2->restore_serialized_message(
mqtt::publish_message(packet.begin(), packet.end()),
[]{}
mqtt::pubrel_message(packet.begin(), packet.end())
);
}
}
Expand Down
16 changes: 16 additions & 0 deletions test/resend_serialize_ptr_size.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ BOOST_AUTO_TEST_CASE( publish_qos1 ) {
serialized.emplace(packet_id, std::string(data, size));
},
[&serialized](std::uint16_t packet_id, char const* data, std::size_t size) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized[packet_id] = std::string(data, size);
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand All @@ -41,9 +43,11 @@ BOOST_AUTO_TEST_CASE( publish_qos1 ) {
serialized.emplace(packet_id, std::string(data, size));
},
[&serialized](std::uint16_t packet_id, char const* data, std::size_t size) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized[packet_id] = std::string(data, size);
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand Down Expand Up @@ -196,9 +200,11 @@ BOOST_AUTO_TEST_CASE( publish_qos2 ) {
serialized.emplace(packet_id, std::string(data, size));
},
[&serialized](std::uint16_t packet_id, char const* data, std::size_t size) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized[packet_id] = std::string(data, size);
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand All @@ -208,9 +214,11 @@ BOOST_AUTO_TEST_CASE( publish_qos2 ) {
serialized.emplace(packet_id, std::string(data, size));
},
[&serialized](std::uint16_t packet_id, char const* data, std::size_t size) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized[packet_id] = std::string(data, size);
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand Down Expand Up @@ -371,9 +379,11 @@ BOOST_AUTO_TEST_CASE( pubrel_qos2 ) {
serialized.emplace(packet_id, std::string(data, size));
},
[&serialized](std::uint16_t packet_id, char const* data, std::size_t size) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized[packet_id] = std::string(data, size);
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand All @@ -383,9 +393,11 @@ BOOST_AUTO_TEST_CASE( pubrel_qos2 ) {
serialized.emplace(packet_id, std::string(data, size));
},
[&serialized](std::uint16_t packet_id, char const* data, std::size_t size) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized[packet_id] = std::string(data, size);
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand Down Expand Up @@ -553,9 +565,11 @@ BOOST_AUTO_TEST_CASE( multi_publish_qos1 ) {
serialized.emplace(packet_id, std::string(data, size));
},
[&serialized](std::uint16_t packet_id, char const* data, std::size_t size) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized[packet_id] = std::string(data, size);
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand All @@ -565,9 +579,11 @@ BOOST_AUTO_TEST_CASE( multi_publish_qos1 ) {
serialized.emplace(packet_id, std::string(data, size));
},
[&serialized](std::uint16_t packet_id, char const* data, std::size_t size) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized[packet_id] = std::string(data, size);
},
[&serialized](std::uint16_t packet_id) {
BOOST_CHECK(serialized.find(packet_id) != serialized.end());
serialized.erase(packet_id);
}
);
Expand Down

0 comments on commit 965ef66

Please sign in to comment.