Skip to content

Commit

Permalink
remove boost from StringIdMaker, and fix some bugs. (#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
ifplusor authored and ShannonDing committed Nov 11, 2019
1 parent e237d8b commit dbbc155
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 195 deletions.
10 changes: 5 additions & 5 deletions src/message/BatchMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "BatchMessage.h"

#include "MQDecoder.h"
#include "StringIdMaker.h"

using namespace std;
namespace rocketmq {

std::string BatchMessage::encode(std::vector<MQMessage>& msgs) {
string encodedBody;
std::string encodedBody;
for (auto message : msgs) {
string unique_id = StringIdMaker::get_mutable_instance().get_unique_id();
std::string unique_id = StringIdMaker::getInstance().createUniqID();
message.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);
encodedBody.append(encode(message));
}
Expand Down Expand Up @@ -59,4 +58,5 @@ std::string BatchMessage::encode(MQMessage& message) {
encodeMsg.append(properties.c_str(), propertiesLength);
return encodeMsg;
}
}

} // namespace rocketmq
2 changes: 1 addition & 1 deletion src/producer/DefaultMQProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg,
bool isBatchMsg = std::type_index(typeid(msg)) == std::type_index(typeid(BatchMessage));
// msgId is produced by client, offsetMsgId produced by broker. (same with java sdk)
if (!isBatchMsg) {
string unique_id = StringIdMaker::get_mutable_instance().get_unique_id();
string unique_id = StringIdMaker::getInstance().createUniqID();
msg.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);

// batch does not support compressing right now,
Expand Down
228 changes: 87 additions & 141 deletions src/producer/StringIdMaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,175 +16,121 @@
*/
#include "StringIdMaker.h"

#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>

#include "ByteOrder.h"
#include "UtilAll.h"

namespace rocketmq {

#ifdef WIN32
int gettimeofdayWin(struct timeval* tp, void* tzp) {
time_t clock;
struct tm tm;
SYSTEMTIME wtm;
GetLocalTime(&wtm);
tm.tm_year = wtm.wYear - 1900;
tm.tm_mon = wtm.wMonth - 1;
tm.tm_mday = wtm.wDay;
tm.tm_hour = wtm.wHour;
tm.tm_min = wtm.wMinute;
tm.tm_sec = wtm.wSecond;
tm.tm_isdst = -1;
clock = mktime(&tm);
tp->tv_sec = clock;
tp->tv_usec = wtm.wMilliseconds * 1000;
return (0);
}
#endif
const char StringIdMaker::sHexAlphabet[16] = {'0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};

StringIdMaker::StringIdMaker() {
memset(_buff, 0, sizeof(_buff));
memset(_0x_buff, 0, sizeof(_0x_buff));
srand((uint32_t)time(NULL));
init_prefix();
}
StringIdMaker::~StringIdMaker() {}
std::srand((uint32_t)std::time(NULL));

void StringIdMaker::init_prefix() {
uint32_t pid = getpid();
uint32_t ip = get_ip();
uint32_t random_num = (rand() % 0xFFFF);
uint32_t pid = ByteOrder::swapIfLittleEndian(static_cast<uint32_t>(getpid()));
uint32_t ip = ByteOrder::swapIfLittleEndian(getIP());
uint32_t random_num = ByteOrder::swapIfLittleEndian(static_cast<uint32_t>(std::rand()));

memcpy(_buff + 2, &pid, 4);
memcpy(_buff, &ip, 4);
memcpy(_buff + 6, &random_num, 4);
unsigned char bin_buf[10];
std::memcpy(bin_buf + 2, &pid, 4);
std::memcpy(bin_buf, &ip, 4);
std::memcpy(bin_buf + 6, &random_num, 4);

hexdump(_buff, _0x_buff, 10);
hexdump(bin_buf, kFixString, 10);
kFixString[20] = '\0';

set_start_and_next_tm();
setStartTime(UtilAll::currentTimeMillis());

mCounter = 0;
}

uint32_t StringIdMaker::get_ip() {
char name[1024];
boost::system::error_code ec;
if (boost::asio::detail::socket_ops::gethostname(name, sizeof(name), ec) != 0) {
return 0;
}
StringIdMaker::~StringIdMaker() {}

boost::asio::io_service io_service;
boost::asio::ip::tcp::resolver resolver(io_service);
boost::asio::ip::tcp::resolver::query query(name, "");
boost::system::error_code error;
boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query, error);
if (error) {
uint32_t StringIdMaker::getIP() {
std::string ip = UtilAll::getLocalAddress();
if (ip.empty()) {
return 0;
}
boost::asio::ip::tcp::resolver::iterator end; // End marker.
boost::asio::ip::tcp::endpoint ep;
while (iter != end) {
ep = *iter++;
}
std::string s_localIpAddress = ep.address().to_string();

int a[4];
std::string IP = s_localIpAddress;
std::string strTemp;
size_t pos;
size_t i = 3;

do {
pos = IP.find(".");

if (pos != std::string::npos) {
strTemp = IP.substr(0, pos);
a[i] = atoi(strTemp.c_str());
i--;
IP.erase(0, pos + 1);
} else {
strTemp = IP;
a[i] = atoi(strTemp.c_str());
break;
}

} while (1);

uint32_t nResult = (a[3] << 24) + (a[2] << 16) + (a[1] << 8) + a[0];
return nResult;
}

uint64_t StringIdMaker::get_curr_ms() {
struct timeval time_now;
// windows and linux use the same function name, windows's defination as begining this file
#ifdef WIN32
gettimeofdayWin(&time_now, NULL); // WIN32
#else
gettimeofday(&time_now, NULL); // LINUX
#endif

uint64_t ms_time = time_now.tv_sec * 1000 + time_now.tv_usec / 1000;
return ms_time;
}
char* ip_str = new char[ip.length() + 1];
std::strncpy(ip_str, ip.c_str(), ip.length());
ip_str[ip.length()] = '\0';

void StringIdMaker::set_start_and_next_tm() {
time_t tmNow = time(NULL);
tm* ptmNow = localtime(&tmNow);
tm mon_begin;
mon_begin.tm_year = ptmNow->tm_year;
mon_begin.tm_mon = ptmNow->tm_mon;
mon_begin.tm_mday = 0;
mon_begin.tm_hour = 0;
mon_begin.tm_min = 0;
mon_begin.tm_sec = 0;

tm mon_next_begin;
if (ptmNow->tm_mon == 12) {
mon_next_begin.tm_year = ptmNow->tm_year + 1;
mon_next_begin.tm_mon = 1;
} else {
mon_next_begin.tm_year = ptmNow->tm_year;
mon_next_begin.tm_mon = ptmNow->tm_mon + 1;
int i = 3;
uint32_t nResult = 0;
for (char* token = std::strtok(ip_str, "."); token != nullptr && i >= 0; token = std::strtok(nullptr, ".")) {
uint32_t n = std::atoi(token);
nResult |= n << (8 * i--);
}
mon_next_begin.tm_mday = 0;
mon_next_begin.tm_hour = 0;
mon_next_begin.tm_min = 0;
mon_next_begin.tm_sec = 0;

time_t mon_begin_tm = mktime(&mon_begin);
time_t mon_end_tm = mktime(&mon_next_begin);
delete[] ip_str;

_start_tm = mon_begin_tm * 1000;
_next_start_tm = mon_end_tm * 1000;
return nResult;
}

int StringIdMaker::atomic_incr(int id) {
#ifdef WIN32
InterlockedIncrement((LONG*)&id);
#else
__sync_add_and_fetch(&id, 1);
#endif
return id;
void StringIdMaker::setStartTime(uint64_t millis) {
// std::time_t
// Although not defined, this is almost always an integral value holding the number of seconds
// (not counting leap seconds) since 00:00, Jan 1 1970 UTC, corresponding to POSIX time.
std::time_t tmNow = millis / 1000;
std::tm* ptmNow = std::localtime(&tmNow); // may not be thread-safe

std::tm curMonthBegin = {0};
curMonthBegin.tm_year = ptmNow->tm_year; // since 1900
curMonthBegin.tm_mon = ptmNow->tm_mon; // [0, 11]
curMonthBegin.tm_mday = 1; // [1, 31]
curMonthBegin.tm_hour = 0; // [0, 23]
curMonthBegin.tm_min = 0; // [0, 59]
curMonthBegin.tm_sec = 0; // [0, 60]

std::tm nextMonthBegin = {0};
if (ptmNow->tm_mon >= 11) {
nextMonthBegin.tm_year = ptmNow->tm_year + 1;
nextMonthBegin.tm_mon = 0;
} else {
nextMonthBegin.tm_year = ptmNow->tm_year;
nextMonthBegin.tm_mon = ptmNow->tm_mon + 1;
}
nextMonthBegin.tm_mday = 1;
nextMonthBegin.tm_hour = 0;
nextMonthBegin.tm_min = 0;
nextMonthBegin.tm_sec = 0;

mStartTime = std::mktime(&curMonthBegin) * 1000;
mNextStartTime = std::mktime(&nextMonthBegin) * 1000;
}
std::string StringIdMaker::get_unique_id() {
uint64_t now_time = get_curr_ms();

if (now_time > _next_start_tm) {
set_start_and_next_tm();
std::string StringIdMaker::createUniqID() {
uint64_t current = UtilAll::currentTimeMillis();
if (current >= mNextStartTime) {
setStartTime(current);
current = UtilAll::currentTimeMillis();
}
uint32_t tm_period = now_time - _start_tm;
seqid = atomic_incr(seqid) & 0xFF;

std::size_t prifix_len = 10; // 10 = prefix len
unsigned char* write_index = _buff + prifix_len;
uint32_t period = ByteOrder::swapIfLittleEndian(static_cast<uint32_t>(current - mStartTime));
uint16_t seqid = ByteOrder::swapIfLittleEndian(mCounter++);

memcpy(write_index, &tm_period, 4);
write_index += 4;
unsigned char bin_buf[6];
std::memcpy(bin_buf, &period, 4);
std::memcpy(bin_buf + 4, &seqid, 2);

memcpy(write_index, &seqid, 2);
char hex_buf[12];
hexdump(bin_buf, hex_buf, 6);

hexdump(_buff + prifix_len, (_0x_buff + (2 * prifix_len)), 6);
_0x_buff[32] = '\0';
return std::string(_0x_buff);
return std::string(kFixString, 20) + std::string(hex_buf, 12);
}

void StringIdMaker::hexdump(unsigned char* buffer, char* out_buff, unsigned long index) {
for (unsigned long i = 0; i < index; i++) {
sprintf(out_buff + 2 * i, "%02X ", buffer[i]);
void StringIdMaker::hexdump(unsigned char* in, char* out, std::size_t len) {
for (std::size_t i = 0; i < len; i++) {
unsigned char v = in[i];
out[i * 2] = sHexAlphabet[v >> 4];
out[i * 2 + 1] = sHexAlphabet[v & 0x0FU];
}
}
}

} // namespace rocketmq
76 changes: 33 additions & 43 deletions src/producer/StringIdMaker.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,61 +14,51 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
ip: 4
pid: 4
随机数 :2
时间:4
自增数:2
*/
#ifndef __STRINGID_MAKER_H__
#define __STRINGID_MAKER_H__

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <boost/serialization/singleton.hpp>
#include <atomic>
#include <cstdint>
#include <string>
#include <boost/asio.hpp>

#ifdef WIN32
#include <windows.h>
#else
#include <unistd.h>
#include <sys/time.h>
#endif

#ifdef WIN32
#include <windows.h>
#else
#include <unistd.h>
#include <sys/time.h>
#endif

namespace rocketmq {
class StringIdMaker : public boost::serialization::singleton<StringIdMaker> {
public:

class StringIdMaker {
private:
StringIdMaker();
~StringIdMaker();
std::string get_unique_id();

public:
static StringIdMaker& getInstance() {
// After c++11, the initialization occurs exactly once
static StringIdMaker singleton_;
return singleton_;
}

/* ID format:
* ip: 4 bytes
* pid: 2 bytes
* random: 4 bytes
* time: 4 bytes
* auto num: 2 bytes
*/
std::string createUniqID();

private:
uint32_t get_ip();
void init_prefix();
uint64_t get_curr_ms();
int atomic_incr(int id);
void set_start_and_next_tm();
void setStartTime(uint64_t millis);

void hexdump(unsigned char* buffer, char* out_buff, unsigned long index);
static uint32_t getIP();
static void hexdump(unsigned char* buffer, char* out_buff, unsigned long index);

private:
uint64_t _start_tm;
uint64_t _next_start_tm;
unsigned char _buff[16];
char _0x_buff[33];
int16_t seqid;
uint64_t mStartTime;
uint64_t mNextStartTime;
std::atomic<uint16_t> mCounter;

char kFixString[21];

static const char sHexAlphabet[16];
};
}

} // namespace rocketmq
#endif
Loading

0 comments on commit dbbc155

Please sign in to comment.