From 7b063fc4952772cf04a394807736669e7a3c879a Mon Sep 17 00:00:00 2001 From: Takatoshi Kondo Date: Fri, 18 Dec 2015 11:20:42 +0900 Subject: [PATCH] Fixed a remaining length calculating algorithm. Added tests for remaining length. --- include/mqtt/client.hpp | 10 +- include/mqtt/remaining_length.hpp | 2 +- test/CMakeLists.txt | 1 + test/remaining_length.cpp | 278 ++++++++++++++++++++++++++++++ 4 files changed, 285 insertions(+), 6 deletions(-) create mode 100644 test/remaining_length.cpp diff --git a/include/mqtt/client.hpp b/include/mqtt/client.hpp index e9d337aa4..51275d4c8 100644 --- a/include/mqtt/client.hpp +++ b/include/mqtt/client.hpp @@ -936,7 +936,7 @@ class client { void handle_control_packet_type() { fixed_header_ = static_cast(buf_); remaining_length_ = 0; - remaining_length_count_ = 0; + remaining_length_multiplier_ = 1; as::async_read( *socket_, as::buffer(&buf_, 1), @@ -951,9 +951,10 @@ class client { } void handle_remaining_length() { + remaining_length_ += (buf_ & 0b01111111) * remaining_length_multiplier_; + remaining_length_multiplier_ *= 128; + if (remaining_length_multiplier_ > 128 * 128 * 128) throw remaining_length_error(); if (buf_ & 0b10000000) { - if (++remaining_length_count_ > 4) throw remaining_length_error(); - remaining_length_ += buf_ & 0b01111111; as::async_read( *socket_, as::buffer(&buf_, 1), @@ -967,7 +968,6 @@ class client { }); } else { - remaining_length_ += buf_; payload_.resize(remaining_length_); as::async_read( *socket_, @@ -1343,7 +1343,7 @@ class client { std::size_t ping_duration_ms_; char buf_; std::uint8_t fixed_header_; - std::size_t remaining_length_count_; + std::size_t remaining_length_multiplier_; std::size_t remaining_length_; std::vector payload_; close_handler h_close_; diff --git a/include/mqtt/remaining_length.hpp b/include/mqtt/remaining_length.hpp index 9d8f73c5f..99ede0129 100644 --- a/include/mqtt/remaining_length.hpp +++ b/include/mqtt/remaining_length.hpp @@ -17,7 +17,7 @@ remaining_bytes(std::size_t size) { if (size > 0xfffffff) throw remaining_length_error(); std::string bytes; while (size > 127) { - bytes.push_back(size & 0b01111111); + bytes.push_back((size & 0b01111111) | 0b10000000); size >>= 7; } bytes.push_back(size & 0b01111111); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 66a2261a2..8f15dce2b 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -12,6 +12,7 @@ LIST (APPEND check_PROGRAMS test_main.cpp connect.cpp pubsub.cpp + remaining_length.cpp multi_sub.cpp resend.cpp manual_publish.cpp diff --git a/test/remaining_length.cpp b/test/remaining_length.cpp new file mode 100644 index 000000000..8fadd4c3b --- /dev/null +++ b/test/remaining_length.cpp @@ -0,0 +1,278 @@ +// Copyright Takatoshi Kondo 2015 +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include "test_settings.hpp" + +#include + +BOOST_AUTO_TEST_SUITE(test_remaining_length) + +BOOST_AUTO_TEST_CASE( pub_sub_over_127 ) { + std::string test_contents; + for (std::size_t i = 0; i < 128; ++i) { + test_contents.push_back(i); + } + + boost::asio::io_service ios; + auto c = mqtt::make_client(ios, broker_url, broker_notls_port); + c.set_clean_session(true); + + std::uint16_t pid_sub; + std::uint16_t pid_unsub; + + int order = 0; + c.set_connack_handler( + [&order, &c, &pid_sub] + (bool sp, std::uint8_t connack_return_code) { + BOOST_TEST(order++ == 0); + BOOST_TEST(sp == false); + BOOST_TEST(connack_return_code == mqtt::connect_return_code::accepted); + + // Clear retaind contents + c.publish_at_most_once(topic_base() + "/topic1", "", true); + + pid_sub = c.subscribe(topic_base() + "/topic1", mqtt::qos::at_most_once); + }); + c.set_close_handler( + [&order] + () { + BOOST_TEST(order++ == 4); + }); + c.set_error_handler( + [] + (boost::system::error_code const&) { + BOOST_CHECK(false); + }); + c.set_puback_handler( + [] + (std::uint16_t) { + BOOST_CHECK(false); + }); + c.set_pubrec_handler( + [] + (std::uint16_t) { + BOOST_CHECK(false); + }); + c.set_pubcomp_handler( + [] + (std::uint16_t) { + BOOST_CHECK(false); + }); + c.set_suback_handler( + [&order, &c, &pid_sub, &test_contents] + (std::uint16_t packet_id, std::vector> results) { + BOOST_TEST(order++ == 1); + BOOST_TEST(packet_id == pid_sub); + BOOST_TEST(results.size() == 1); + BOOST_TEST(*results[0] == mqtt::qos::at_most_once); + c.publish_at_most_once(topic_base() + "/topic1", test_contents); + }); + c.set_unsuback_handler( + [&order, &c, &pid_unsub] + (std::uint16_t packet_id) { + BOOST_TEST(order++ == 3); + BOOST_TEST(packet_id == pid_unsub); + c.disconnect(); + }); + c.set_publish_handler( + [&order, &c, &pid_unsub, &test_contents] + (std::uint8_t header, + boost::optional packet_id, + std::string topic, + std::string contents) { + BOOST_TEST(order++ == 2); + BOOST_TEST(mqtt::publish::is_dup(header) == false); + BOOST_TEST(mqtt::publish::get_qos(header) == mqtt::qos::at_most_once); + BOOST_TEST(mqtt::publish::is_retain(header) == false); + BOOST_CHECK(!packet_id); + BOOST_TEST(topic == topic_base() + "/topic1"); + BOOST_TEST(contents == test_contents); + pid_unsub = c.unsubscribe(topic_base() + "/topic1"); + }); + c.connect(); + ios.run(); + BOOST_TEST(order++ == 5); +} + +BOOST_AUTO_TEST_CASE( pub_sub_over_16384 ) { + std::string test_contents; + for (std::size_t i = 0; i < 16384; ++i) { + test_contents.push_back(i); + } + + boost::asio::io_service ios; + auto c = mqtt::make_client(ios, broker_url, broker_notls_port); + c.set_clean_session(true); + + std::uint16_t pid_sub; + std::uint16_t pid_unsub; + + int order = 0; + c.set_connack_handler( + [&order, &c, &pid_sub] + (bool sp, std::uint8_t connack_return_code) { + BOOST_TEST(order++ == 0); + BOOST_TEST(sp == false); + BOOST_TEST(connack_return_code == mqtt::connect_return_code::accepted); + + // Clear retaind contents + c.publish_at_most_once(topic_base() + "/topic1", "", true); + + pid_sub = c.subscribe(topic_base() + "/topic1", mqtt::qos::at_most_once); + }); + c.set_close_handler( + [&order] + () { + BOOST_TEST(order++ == 4); + }); + c.set_error_handler( + [] + (boost::system::error_code const&) { + BOOST_CHECK(false); + }); + c.set_puback_handler( + [] + (std::uint16_t) { + BOOST_CHECK(false); + }); + c.set_pubrec_handler( + [] + (std::uint16_t) { + BOOST_CHECK(false); + }); + c.set_pubcomp_handler( + [] + (std::uint16_t) { + BOOST_CHECK(false); + }); + c.set_suback_handler( + [&order, &c, &pid_sub, &test_contents] + (std::uint16_t packet_id, std::vector> results) { + BOOST_TEST(order++ == 1); + BOOST_TEST(packet_id == pid_sub); + BOOST_TEST(results.size() == 1); + BOOST_TEST(*results[0] == mqtt::qos::at_most_once); + c.publish_at_most_once(topic_base() + "/topic1", test_contents); + }); + c.set_unsuback_handler( + [&order, &c, &pid_unsub] + (std::uint16_t packet_id) { + BOOST_TEST(order++ == 3); + BOOST_TEST(packet_id == pid_unsub); + c.disconnect(); + }); + c.set_publish_handler( + [&order, &c, &pid_unsub, &test_contents] + (std::uint8_t header, + boost::optional packet_id, + std::string topic, + std::string contents) { + BOOST_TEST(order++ == 2); + BOOST_TEST(mqtt::publish::is_dup(header) == false); + BOOST_TEST(mqtt::publish::get_qos(header) == mqtt::qos::at_most_once); + BOOST_TEST(mqtt::publish::is_retain(header) == false); + BOOST_CHECK(!packet_id); + BOOST_TEST(topic == topic_base() + "/topic1"); + BOOST_TEST(contents == test_contents); + pid_unsub = c.unsubscribe(topic_base() + "/topic1"); + }); + c.connect(); + ios.run(); + BOOST_TEST(order++ == 5); +} + +# if 0 // It would make network load too much. + +BOOST_AUTO_TEST_CASE( pub_sub_over_2097152 ) { + std::string test_contents; + for (std::size_t i = 0; i < 2097152; ++i) { + test_contents.push_back(i); + } + + boost::asio::io_service ios; + auto c = mqtt::make_client(ios, broker_url, broker_notls_port); + c.set_clean_session(true); + + std::uint16_t pid_sub; + std::uint16_t pid_unsub; + + int order = 0; + c.set_connack_handler( + [&order, &c, &pid_sub] + (bool sp, std::uint8_t connack_return_code) { + BOOST_TEST(order++ == 0); + BOOST_TEST(sp == false); + BOOST_TEST(connack_return_code == mqtt::connect_return_code::accepted); + + // Clear retaind contents + c.publish_at_most_once(topic_base() + "/topic1", "", true); + + pid_sub = c.subscribe(topic_base() + "/topic1", mqtt::qos::at_most_once); + }); + c.set_close_handler( + [&order] + () { + BOOST_TEST(order++ == 4); + }); + c.set_error_handler( + [] + (boost::system::error_code const&) { + BOOST_CHECK(false); + }); + c.set_puback_handler( + [] + (std::uint16_t) { + BOOST_CHECK(false); + }); + c.set_pubrec_handler( + [] + (std::uint16_t) { + BOOST_CHECK(false); + }); + c.set_pubcomp_handler( + [] + (std::uint16_t) { + BOOST_CHECK(false); + }); + c.set_suback_handler( + [&order, &c, &pid_sub, &test_contents] + (std::uint16_t packet_id, std::vector> results) { + BOOST_TEST(order++ == 1); + BOOST_TEST(packet_id == pid_sub); + BOOST_TEST(results.size() == 1); + BOOST_TEST(*results[0] == mqtt::qos::at_most_once); + c.publish_at_most_once(topic_base() + "/topic1", test_contents); + }); + c.set_unsuback_handler( + [&order, &c, &pid_unsub] + (std::uint16_t packet_id) { + BOOST_TEST(order++ == 3); + BOOST_TEST(packet_id == pid_unsub); + c.disconnect(); + }); + c.set_publish_handler( + [&order, &c, &pid_unsub, &test_contents] + (std::uint8_t header, + boost::optional packet_id, + std::string topic, + std::string contents) { + BOOST_TEST(order++ == 2); + BOOST_TEST(mqtt::publish::is_dup(header) == false); + BOOST_TEST(mqtt::publish::get_qos(header) == mqtt::qos::at_most_once); + BOOST_TEST(mqtt::publish::is_retain(header) == false); + BOOST_CHECK(!packet_id); + BOOST_TEST(topic == topic_base() + "/topic1"); + BOOST_TEST(contents == test_contents); + pid_unsub = c.unsubscribe(topic_base() + "/topic1"); + }); + c.connect(); + ios.run(); + BOOST_TEST(order++ == 5); +} + +#endif + +BOOST_AUTO_TEST_SUITE_END()