Skip to content

Commit

Permalink
Merge pull request #8 from redboltz/fix_remaining_length
Browse files Browse the repository at this point in the history
Fixed a remaining length calculating algorithm.
  • Loading branch information
redboltz committed Dec 18, 2015
2 parents 9006662 + 7b063fc commit 1376f55
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 6 deletions.
10 changes: 5 additions & 5 deletions include/mqtt/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ class client {
void handle_control_packet_type() {
fixed_header_ = static_cast<std::uint8_t>(buf_);
remaining_length_ = 0;
remaining_length_count_ = 0;
remaining_length_multiplier_ = 1;
as::async_read(
*socket_,
as::buffer(&buf_, 1),
Expand All @@ -951,9 +951,10 @@ class client {
}

void handle_remaining_length() {
remaining_length_ += (buf_ & 0b01111111) * remaining_length_multiplier_;
remaining_length_multiplier_ *= 128;
if (remaining_length_multiplier_ > 128 * 128 * 128) throw remaining_length_error();
if (buf_ & 0b10000000) {
if (++remaining_length_count_ > 4) throw remaining_length_error();
remaining_length_ += buf_ & 0b01111111;
as::async_read(
*socket_,
as::buffer(&buf_, 1),
Expand All @@ -967,7 +968,6 @@ class client {
});
}
else {
remaining_length_ += buf_;
payload_.resize(remaining_length_);
as::async_read(
*socket_,
Expand Down Expand Up @@ -1343,7 +1343,7 @@ class client {
std::size_t ping_duration_ms_;
char buf_;
std::uint8_t fixed_header_;
std::size_t remaining_length_count_;
std::size_t remaining_length_multiplier_;
std::size_t remaining_length_;
std::vector<char> payload_;
close_handler h_close_;
Expand Down
2 changes: 1 addition & 1 deletion include/mqtt/remaining_length.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ remaining_bytes(std::size_t size) {
if (size > 0xfffffff) throw remaining_length_error();
std::string bytes;
while (size > 127) {
bytes.push_back(size & 0b01111111);
bytes.push_back((size & 0b01111111) | 0b10000000);
size >>= 7;
}
bytes.push_back(size & 0b01111111);
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ LIST (APPEND check_PROGRAMS
test_main.cpp
connect.cpp
pubsub.cpp
remaining_length.cpp
multi_sub.cpp
resend.cpp
manual_publish.cpp
Expand Down
278 changes: 278 additions & 0 deletions test/remaining_length.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// Copyright Takatoshi Kondo 2015
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#include "test_settings.hpp"

#include <mqtt/client.hpp>

BOOST_AUTO_TEST_SUITE(test_remaining_length)

BOOST_AUTO_TEST_CASE( pub_sub_over_127 ) {
std::string test_contents;
for (std::size_t i = 0; i < 128; ++i) {
test_contents.push_back(i);
}

boost::asio::io_service ios;
auto c = mqtt::make_client(ios, broker_url, broker_notls_port);
c.set_clean_session(true);

std::uint16_t pid_sub;
std::uint16_t pid_unsub;

int order = 0;
c.set_connack_handler(
[&order, &c, &pid_sub]
(bool sp, std::uint8_t connack_return_code) {
BOOST_TEST(order++ == 0);
BOOST_TEST(sp == false);
BOOST_TEST(connack_return_code == mqtt::connect_return_code::accepted);

// Clear retaind contents
c.publish_at_most_once(topic_base() + "/topic1", "", true);

pid_sub = c.subscribe(topic_base() + "/topic1", mqtt::qos::at_most_once);
});
c.set_close_handler(
[&order]
() {
BOOST_TEST(order++ == 4);
});
c.set_error_handler(
[]
(boost::system::error_code const&) {
BOOST_CHECK(false);
});
c.set_puback_handler(
[]
(std::uint16_t) {
BOOST_CHECK(false);
});
c.set_pubrec_handler(
[]
(std::uint16_t) {
BOOST_CHECK(false);
});
c.set_pubcomp_handler(
[]
(std::uint16_t) {
BOOST_CHECK(false);
});
c.set_suback_handler(
[&order, &c, &pid_sub, &test_contents]
(std::uint16_t packet_id, std::vector<boost::optional<std::uint8_t>> results) {
BOOST_TEST(order++ == 1);
BOOST_TEST(packet_id == pid_sub);
BOOST_TEST(results.size() == 1);
BOOST_TEST(*results[0] == mqtt::qos::at_most_once);
c.publish_at_most_once(topic_base() + "/topic1", test_contents);
});
c.set_unsuback_handler(
[&order, &c, &pid_unsub]
(std::uint16_t packet_id) {
BOOST_TEST(order++ == 3);
BOOST_TEST(packet_id == pid_unsub);
c.disconnect();
});
c.set_publish_handler(
[&order, &c, &pid_unsub, &test_contents]
(std::uint8_t header,
boost::optional<std::uint16_t> packet_id,
std::string topic,
std::string contents) {
BOOST_TEST(order++ == 2);
BOOST_TEST(mqtt::publish::is_dup(header) == false);
BOOST_TEST(mqtt::publish::get_qos(header) == mqtt::qos::at_most_once);
BOOST_TEST(mqtt::publish::is_retain(header) == false);
BOOST_CHECK(!packet_id);
BOOST_TEST(topic == topic_base() + "/topic1");
BOOST_TEST(contents == test_contents);
pid_unsub = c.unsubscribe(topic_base() + "/topic1");
});
c.connect();
ios.run();
BOOST_TEST(order++ == 5);
}

BOOST_AUTO_TEST_CASE( pub_sub_over_16384 ) {
std::string test_contents;
for (std::size_t i = 0; i < 16384; ++i) {
test_contents.push_back(i);
}

boost::asio::io_service ios;
auto c = mqtt::make_client(ios, broker_url, broker_notls_port);
c.set_clean_session(true);

std::uint16_t pid_sub;
std::uint16_t pid_unsub;

int order = 0;
c.set_connack_handler(
[&order, &c, &pid_sub]
(bool sp, std::uint8_t connack_return_code) {
BOOST_TEST(order++ == 0);
BOOST_TEST(sp == false);
BOOST_TEST(connack_return_code == mqtt::connect_return_code::accepted);

// Clear retaind contents
c.publish_at_most_once(topic_base() + "/topic1", "", true);

pid_sub = c.subscribe(topic_base() + "/topic1", mqtt::qos::at_most_once);
});
c.set_close_handler(
[&order]
() {
BOOST_TEST(order++ == 4);
});
c.set_error_handler(
[]
(boost::system::error_code const&) {
BOOST_CHECK(false);
});
c.set_puback_handler(
[]
(std::uint16_t) {
BOOST_CHECK(false);
});
c.set_pubrec_handler(
[]
(std::uint16_t) {
BOOST_CHECK(false);
});
c.set_pubcomp_handler(
[]
(std::uint16_t) {
BOOST_CHECK(false);
});
c.set_suback_handler(
[&order, &c, &pid_sub, &test_contents]
(std::uint16_t packet_id, std::vector<boost::optional<std::uint8_t>> results) {
BOOST_TEST(order++ == 1);
BOOST_TEST(packet_id == pid_sub);
BOOST_TEST(results.size() == 1);
BOOST_TEST(*results[0] == mqtt::qos::at_most_once);
c.publish_at_most_once(topic_base() + "/topic1", test_contents);
});
c.set_unsuback_handler(
[&order, &c, &pid_unsub]
(std::uint16_t packet_id) {
BOOST_TEST(order++ == 3);
BOOST_TEST(packet_id == pid_unsub);
c.disconnect();
});
c.set_publish_handler(
[&order, &c, &pid_unsub, &test_contents]
(std::uint8_t header,
boost::optional<std::uint16_t> packet_id,
std::string topic,
std::string contents) {
BOOST_TEST(order++ == 2);
BOOST_TEST(mqtt::publish::is_dup(header) == false);
BOOST_TEST(mqtt::publish::get_qos(header) == mqtt::qos::at_most_once);
BOOST_TEST(mqtt::publish::is_retain(header) == false);
BOOST_CHECK(!packet_id);
BOOST_TEST(topic == topic_base() + "/topic1");
BOOST_TEST(contents == test_contents);
pid_unsub = c.unsubscribe(topic_base() + "/topic1");
});
c.connect();
ios.run();
BOOST_TEST(order++ == 5);
}

# if 0 // It would make network load too much.

BOOST_AUTO_TEST_CASE( pub_sub_over_2097152 ) {
std::string test_contents;
for (std::size_t i = 0; i < 2097152; ++i) {
test_contents.push_back(i);
}

boost::asio::io_service ios;
auto c = mqtt::make_client(ios, broker_url, broker_notls_port);
c.set_clean_session(true);

std::uint16_t pid_sub;
std::uint16_t pid_unsub;

int order = 0;
c.set_connack_handler(
[&order, &c, &pid_sub]
(bool sp, std::uint8_t connack_return_code) {
BOOST_TEST(order++ == 0);
BOOST_TEST(sp == false);
BOOST_TEST(connack_return_code == mqtt::connect_return_code::accepted);

// Clear retaind contents
c.publish_at_most_once(topic_base() + "/topic1", "", true);

pid_sub = c.subscribe(topic_base() + "/topic1", mqtt::qos::at_most_once);
});
c.set_close_handler(
[&order]
() {
BOOST_TEST(order++ == 4);
});
c.set_error_handler(
[]
(boost::system::error_code const&) {
BOOST_CHECK(false);
});
c.set_puback_handler(
[]
(std::uint16_t) {
BOOST_CHECK(false);
});
c.set_pubrec_handler(
[]
(std::uint16_t) {
BOOST_CHECK(false);
});
c.set_pubcomp_handler(
[]
(std::uint16_t) {
BOOST_CHECK(false);
});
c.set_suback_handler(
[&order, &c, &pid_sub, &test_contents]
(std::uint16_t packet_id, std::vector<boost::optional<std::uint8_t>> results) {
BOOST_TEST(order++ == 1);
BOOST_TEST(packet_id == pid_sub);
BOOST_TEST(results.size() == 1);
BOOST_TEST(*results[0] == mqtt::qos::at_most_once);
c.publish_at_most_once(topic_base() + "/topic1", test_contents);
});
c.set_unsuback_handler(
[&order, &c, &pid_unsub]
(std::uint16_t packet_id) {
BOOST_TEST(order++ == 3);
BOOST_TEST(packet_id == pid_unsub);
c.disconnect();
});
c.set_publish_handler(
[&order, &c, &pid_unsub, &test_contents]
(std::uint8_t header,
boost::optional<std::uint16_t> packet_id,
std::string topic,
std::string contents) {
BOOST_TEST(order++ == 2);
BOOST_TEST(mqtt::publish::is_dup(header) == false);
BOOST_TEST(mqtt::publish::get_qos(header) == mqtt::qos::at_most_once);
BOOST_TEST(mqtt::publish::is_retain(header) == false);
BOOST_CHECK(!packet_id);
BOOST_TEST(topic == topic_base() + "/topic1");
BOOST_TEST(contents == test_contents);
pid_unsub = c.unsubscribe(topic_base() + "/topic1");
});
c.connect();
ios.run();
BOOST_TEST(order++ == 5);
}

#endif

BOOST_AUTO_TEST_SUITE_END()

0 comments on commit 1376f55

Please sign in to comment.