diff --git a/dbms/src/Debug/MockKVStore/MockUtils.h b/dbms/src/Debug/MockKVStore/MockUtils.h index 44696999694..69cc246245d 100644 --- a/dbms/src/Debug/MockKVStore/MockUtils.h +++ b/dbms/src/Debug/MockKVStore/MockUtils.h @@ -96,4 +96,70 @@ inline RegionPtr makeRegion(RegionMeta && meta) return std::make_shared(std::move(meta), nullptr); } +// Generates a lock value which fills all fields, only for test use. +inline TiKVValue encodeFullLockCfValue( + UInt8 lock_type, + const String & primary, + Timestamp ts, + UInt64 ttl, + const String * short_value, + Timestamp min_commit_ts, + Timestamp for_update_ts, + uint64_t txn_size, + const std::vector & async_commit, + const std::vector & rollback, + UInt64 generation = 0) +{ + auto lock_value = RecordKVFormat::encodeLockCfValue(lock_type, primary, ts, ttl, short_value, min_commit_ts); + WriteBufferFromOwnString res; + res.write(lock_value.getStr().data(), lock_value.getStr().size()); + { + res.write(RecordKVFormat::MIN_COMMIT_TS_PREFIX); + RecordKVFormat::encodeUInt64(min_commit_ts, res); + } + { + res.write(RecordKVFormat::FOR_UPDATE_TS_PREFIX); + RecordKVFormat::encodeUInt64(for_update_ts, res); + } + { + res.write(RecordKVFormat::TXN_SIZE_PREFIX); + RecordKVFormat::encodeUInt64(txn_size, res); + } + { + res.write(RecordKVFormat::ROLLBACK_TS_PREFIX); + TiKV::writeVarUInt(rollback.size(), res); + for (auto ts : rollback) + { + RecordKVFormat::encodeUInt64(ts, res); + } + } + { + res.write(RecordKVFormat::ASYNC_COMMIT_PREFIX); + TiKV::writeVarUInt(async_commit.size(), res); + for (const auto & s : async_commit) + { + writeVarInt(s.size(), res); + res.write(s.data(), s.size()); + } + } + { + res.write(RecordKVFormat::LAST_CHANGE_PREFIX); + RecordKVFormat::encodeUInt64(12345678, res); + TiKV::writeVarUInt(87654321, res); + } + { + res.write(RecordKVFormat::TXN_SOURCE_PREFIX_FOR_LOCK); + TiKV::writeVarUInt(876543, res); + } + { + res.write(RecordKVFormat::PESSIMISTIC_LOCK_WITH_CONFLICT_PREFIX); + } + if (generation > 0) + { + res.write(RecordKVFormat::GENERATION_PREFIX); + RecordKVFormat::encodeUInt64(generation, res); + } + return TiKVValue(res.releaseStr()); +} + } // namespace DB::RegionBench diff --git a/dbms/src/Storages/KVStore/Decode/RegionTable.cpp b/dbms/src/Storages/KVStore/Decode/RegionTable.cpp index 4425bd932c0..a209d9b7f38 100644 --- a/dbms/src/Storages/KVStore/Decode/RegionTable.cpp +++ b/dbms/src/Storages/KVStore/Decode/RegionTable.cpp @@ -46,6 +46,11 @@ namespace FailPoints extern const char force_set_num_regions_for_table[]; } // namespace FailPoints +void RegionTable::InternalRegion::updateRegionCacheBytes(size_t cache_bytes_) +{ + cache_bytes = cache_bytes_; +} + RegionTable::Table & RegionTable::getOrCreateTable(const KeyspaceID keyspace_id, const TableID table_id) { auto ks_table_id = KeyspaceTableID{keyspace_id, table_id}; @@ -156,7 +161,7 @@ void RegionTable::updateRegion(const Region & region) { std::lock_guard lock(mutex); auto & internal_region = getOrInsertRegion(region); - internal_region.cache_bytes = region.dataSize(); + internal_region.updateRegionCacheBytes(region.dataSize()); } namespace @@ -308,9 +313,7 @@ RegionDataReadInfoList RegionTable::tryWriteBlockByRegion(const RegionPtr & regi func_update_region([&](InternalRegion & internal_region) -> bool { internal_region.pause_flush = false; - internal_region.cache_bytes = region->dataSize(); - - internal_region.last_flush_time = Clock::now(); + internal_region.updateRegionCacheBytes(region->dataSize()); return true; }); @@ -380,7 +383,7 @@ void RegionTable::shrinkRegionRange(const Region & region) std::lock_guard lock(mutex); auto & internal_region = getOrInsertRegion(region); internal_region.range_in_table = region.getRange()->rawKeys(); - internal_region.cache_bytes = region.dataSize(); + internal_region.updateRegionCacheBytes(region.dataSize()); } void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeKeys & region_range_keys) diff --git a/dbms/src/Storages/KVStore/Decode/RegionTable.h b/dbms/src/Storages/KVStore/Decode/RegionTable.h index d0eaf4f80ec..1c15c18624d 100644 --- a/dbms/src/Storages/KVStore/Decode/RegionTable.h +++ b/dbms/src/Storages/KVStore/Decode/RegionTable.h @@ -78,11 +78,14 @@ class RegionTable : private boost::noncopyable , range_in_table(range_in_table_) {} + void updateRegionCacheBytes(size_t); + RegionID region_id; std::pair range_in_table; bool pause_flush = false; + + private: Int64 cache_bytes = 0; - Timepoint last_flush_time = Clock::now(); }; using InternalRegions = std::unordered_map; diff --git a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp index a765e1f0066..a1883bca63b 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp @@ -374,11 +374,11 @@ std::pair Region::handleWriteRaftCmd( if unlikely (is_v2) { // There may be orphan default key in a snapshot. - write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::AllowSame); + write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::AllowSame).payload; } else { - write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::Deny); + write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::Deny).payload; } } catch (Exception & e) diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp index 38778e5ac17..93a79394cae 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp @@ -28,61 +28,21 @@ const TiKVKey & RegionCFDataBase::getTiKVKey(const Value & val) return *std::get<0>(val); } -template -const std::shared_ptr & getTiKVValuePtr(const Value & val) -{ - return std::get<1>(val); -} - template const TiKVValue & RegionCFDataBase::getTiKVValue(const Value & val) { - return *getTiKVValuePtr(val); + return *std::get<1>(val); } template -RegionDataRes RegionCFDataBase::insert(TiKVKey && key, TiKVValue && value, DupCheck mode) +RegionDataMemDiff RegionCFDataBase::insert(TiKVKey && key, TiKVValue && value, DupCheck mode) { auto raw_key = RecordKVFormat::decodeTiKVKey(key); - auto kv_pair = Trait::genKVPair(std::move(key), std::move(raw_key), std::move(value)); - if (!kv_pair) - return 0; - - return insert(std::move(*kv_pair), mode); -} - -template <> -RegionDataRes RegionCFDataBase::insert(TiKVKey && key, TiKVValue && value, DupCheck mode) -{ - UNUSED(mode); - RegionDataRes added_size = calcTiKVKeyValueSize(key, value); - Pair kv_pair = RegionLockCFDataTrait::genKVPair(std::move(key), std::move(value)); - const auto & decoded = std::get<2>(kv_pair.second); - bool is_large_txn = decoded->isLargeTxn(); - { - auto iter = data.find(kv_pair.first); - if (iter != data.end()) - { - // Could be a perssimistic lock is overwritten, or a old generation large txn lock is overwritten. - added_size -= calcTiKVKeyValueSize(iter->second); - data.erase(iter); + auto kv_pair_ptr = Trait::genKVPair(std::move(key), std::move(raw_key), std::move(value)); + if (!kv_pair_ptr) + return {}; - // In most cases, an optimistic lock replace a pessimistic lock. - GET_METRIC(tiflash_raft_process_keys, type_lock_replaced).Increment(1); - } - if unlikely (is_large_txn) - { - GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1); - } - } - // According to the process of pessimistic lock, just overwrite. - data.emplace(std::move(kv_pair.first), std::move(kv_pair.second)); - return added_size; -} - -template -RegionDataRes RegionCFDataBase::insert(std::pair && kv_pair, DupCheck mode) -{ + auto & kv_pair = *kv_pair_ptr; auto & map = data; TiKVValue prev_value; if (mode == DupCheck::AllowSame) @@ -113,8 +73,8 @@ RegionDataRes RegionCFDataBase::insert(std::pair && kv_pair, + " new_val: " + prev_value.toDebugString(), ErrorCodes::LOGICAL_ERROR); } - // duplicated key is ignored - return 0; + // Duplicated key is ignored + return {}; } else { @@ -124,44 +84,76 @@ RegionDataRes RegionCFDataBase::insert(std::pair && kv_pair, } } - return calcTiKVKeyValueSize(it->second); + return calcTotalKVSize(it->second); } -template -size_t RegionCFDataBase::calcTiKVKeyValueSize(const Value & value) +template <> +RegionDataMemDiff RegionCFDataBase::insert(TiKVKey && key, TiKVValue && value, DupCheck mode) { - return calcTiKVKeyValueSize(getTiKVKey(value), getTiKVValue(value)); + UNUSED(mode); + RegionDataMemDiff delta; + Pair kv_pair = RegionLockCFDataTrait::genKVPair(std::move(key), std::move(value)); + const auto & decoded = std::get<2>(kv_pair.second); + bool is_large_txn = decoded->isLargeTxn(); + { + auto iter = data.find(kv_pair.first); + if (iter != data.end()) + { + // Could be a pessimistic lock is overwritten, or an old generation large txn lock is overwritten. + delta.sub(calcTotalKVSize(iter->second)); + data.erase(iter); + + // In most cases, an optimistic lock replace a pessimistic lock. + GET_METRIC(tiflash_raft_process_keys, type_lock_replaced).Increment(1); + } + if unlikely (is_large_txn) + { + GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1); + } + } + // According to the process of pessimistic lock, just overwrite. + if (const auto & [it, ok] = data.emplace(std::move(kv_pair.first), std::move(kv_pair.second)); ok) + { + delta.add(calcTotalKVSize(it->second)); + } + return delta; } template -size_t RegionCFDataBase::calcTiKVKeyValueSize(const TiKVKey & key, const TiKVValue & value) +RegionDataMemDiff RegionCFDataBase::calcTotalKVSize(const Value & value) { + Int64 payload_size = getTiKVKey(value).dataSize() + getTiKVValue(value).dataSize(); if constexpr (std::is_same::value) { - // We start to count size of Lock Cf. - return key.dataSize() + value.dataSize(); + return { + // We start to count the size of Lock Cf since #8805 + payload_size, + // The decoded size of Lock Cf + static_cast(std::get<2>(value)->getSize()), + }; } else { - return key.dataSize() + value.dataSize(); + // No decoded data in write & default cf currently. + return {payload_size, 0}; } } - template -bool RegionCFDataBase::shouldIgnoreRemove(const RegionCFDataBase::Value &) +bool RegionCFDataBase::shouldIgnoreRemove(const RegionCFDataBase::Value & value) { - return false; -} - -template <> -bool RegionCFDataBase::shouldIgnoreRemove(const RegionCFDataBase::Value & value) -{ - return RegionWriteCFDataTrait::getWriteType(value) == CFModifyFlag::DelFlag; + if constexpr (std::is_same::value) + { + return RegionWriteCFDataTrait::getWriteType(value) == CFModifyFlag::DelFlag; + } + else + { + return false; + } } template -size_t RegionCFDataBase::remove(const Key & key, bool quiet) +RegionDataMemDiff RegionCFDataBase::remove(const Key & key, bool quiet) { auto & map = data; @@ -170,10 +162,9 @@ size_t RegionCFDataBase::remove(const Key & key, bool quiet) const Value & value = it->second; if (shouldIgnoreRemove(value)) - return 0; - - size_t size = calcTiKVKeyValueSize(value); + return {}; + auto delta = calcTotalKVSize(value).negative(); if constexpr (std::is_same::value) { if unlikely (std::get<2>(value)->isLargeTxn()) @@ -182,12 +173,12 @@ size_t RegionCFDataBase::remove(const Key & key, bool quiet) } } map.erase(it); - return size; + return delta; } else if (!quiet) throw Exception("Key not found", ErrorCodes::LOGICAL_ERROR); - return 0; + return {}; } template @@ -221,19 +212,19 @@ size_t RegionCFDataBase::getSize() const } template -RegionCFDataBase::RegionCFDataBase(RegionCFDataBase && region) +RegionCFDataBase::RegionCFDataBase(RegionCFDataBase && region) noexcept : data(std::move(region.data)) {} template -RegionCFDataBase & RegionCFDataBase::operator=(RegionCFDataBase && region) +RegionCFDataBase & RegionCFDataBase::operator=(RegionCFDataBase && region) noexcept { data = std::move(region.data); return *this; } template -std::string getTraitName() +std::string_view getTraitName() { if constexpr (std::is_same_v) { @@ -254,9 +245,9 @@ std::string getTraitName() } template -size_t RegionCFDataBase::mergeFrom(const RegionCFDataBase & ori_region_data) +RegionDataMemDiff RegionCFDataBase::mergeFrom(const RegionCFDataBase & ori_region_data) { - size_t size_changed = 0; + RegionDataMemDiff res; const auto & ori_map = ori_region_data.data; auto & tar_map = data; @@ -264,7 +255,7 @@ size_t RegionCFDataBase::mergeFrom(const RegionCFDataBase & ori_region_da size_t ori_key_count = ori_region_data.getSize(); for (auto it = ori_map.begin(); it != ori_map.end(); it++) { - size_changed += calcTiKVKeyValueSize(it->second); + res.add(calcTotalKVSize(it->second)); auto ok = tar_map.emplace(*it).second; if (!ok) throw Exception( @@ -276,14 +267,14 @@ size_t RegionCFDataBase::mergeFrom(const RegionCFDataBase & ori_region_da ori_key_count); } - return size_changed; + return res; } template -size_t RegionCFDataBase::splitInto(const RegionRange & range, RegionCFDataBase & new_region_data) +RegionDataMemDiff RegionCFDataBase::splitInto(const RegionRange & range, RegionCFDataBase & new_region_data) { const auto & [start_key, end_key] = range; - size_t size_changed = 0; + RegionDataMemDiff res; { auto & ori_map = data; @@ -295,7 +286,7 @@ size_t RegionCFDataBase::splitInto(const RegionRange & range, RegionCFDat if (start_key.compare(key) <= 0 && end_key.compare(key) > 0) { - size_changed += calcTiKVKeyValueSize(it->second); + res.sub(calcTotalKVSize(it->second)); tar_map.insert(std::move(*it)); it = ori_map.erase(it); } @@ -303,16 +294,14 @@ size_t RegionCFDataBase::splitInto(const RegionRange & range, RegionCFDat ++it; } } - return size_changed; + return res; } template size_t RegionCFDataBase::serialize(WriteBuffer & buf) const { size_t total_size = 0; - size_t size = getSize(); - total_size += writeBinary2(size, buf); for (const auto & ele : data) @@ -322,22 +311,22 @@ size_t RegionCFDataBase::serialize(WriteBuffer & buf) const total_size += key.serialize(buf); total_size += value.serialize(buf); } - return total_size; } template -size_t RegionCFDataBase::deserialize(ReadBuffer & buf, RegionCFDataBase & new_region_data) +RegionDataMemDiff RegionCFDataBase::deserialize(ReadBuffer & buf, RegionCFDataBase & new_region_data) { auto size = readBinary2(buf); - size_t cf_data_size = 0; + RegionDataMemDiff total_diff; for (size_t i = 0; i < size; ++i) { auto key = TiKVKey::deserialize(buf); auto value = TiKVValue::deserialize(buf); - cf_data_size += new_region_data.insert(std::move(key), std::move(value)); + auto diff = new_region_data.insert(std::move(key), std::move(value)); + total_diff.add(diff); } - return cf_data_size; + return total_diff; } template diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h index a163ababbd3..361f0308783 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h @@ -18,14 +18,44 @@ #include #include -#include - namespace DB { struct TiKVRangeKey; using RegionRange = RegionRangeKeys::RegionRange; -using RegionDataRes = int64_t; +struct RegionDataMemDiff +{ + using Type = Int64; + Type payload; + Type decoded; + + RegionDataMemDiff(Type payload_, Type decoded_) + : payload(payload_) + , decoded(decoded_) + {} + RegionDataMemDiff(UInt64 payload_, UInt64 decoded_) + : payload(static_cast(payload_)) + , decoded(static_cast(decoded_)) + {} + RegionDataMemDiff() + : payload(0) + , decoded(0) + {} + + RegionDataMemDiff negative() const { return {-payload, -decoded}; } + + void add(const RegionDataMemDiff & other) + { + payload += other.payload; + decoded += other.decoded; + } + + void sub(const RegionDataMemDiff & other) + { + payload -= other.payload; + decoded -= other.decoded; + } +}; enum class DupCheck { @@ -43,17 +73,11 @@ struct RegionCFDataBase using Pair = std::pair; using Status = bool; - static const TiKVKey & getTiKVKey(const Value & val); - - static const TiKVValue & getTiKVValue(const Value & val); - - RegionDataRes insert(TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny); - - static size_t calcTiKVKeyValueSize(const Value & value); + RegionDataMemDiff insert(TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny); - static size_t calcTiKVKeyValueSize(const TiKVKey & key, const TiKVValue & value); + static RegionDataMemDiff calcTotalKVSize(const Value & value); - size_t remove(const Key & key, bool quiet = false); + RegionDataMemDiff remove(const Key & key, bool quiet = false); static bool cmp(const Map & a, const Map & b); @@ -62,23 +86,24 @@ struct RegionCFDataBase size_t getSize() const; RegionCFDataBase() = default; - RegionCFDataBase(RegionCFDataBase && region); - RegionCFDataBase & operator=(RegionCFDataBase && region); + RegionCFDataBase(RegionCFDataBase && region) noexcept; + RegionCFDataBase & operator=(RegionCFDataBase && region) noexcept; - size_t splitInto(const RegionRange & range, RegionCFDataBase & new_region_data); - size_t mergeFrom(const RegionCFDataBase & ori_region_data); + RegionDataMemDiff splitInto(const RegionRange & range, RegionCFDataBase & new_region_data); + RegionDataMemDiff mergeFrom(const RegionCFDataBase & ori_region_data); size_t serialize(WriteBuffer & buf) const; - - static size_t deserialize(ReadBuffer & buf, RegionCFDataBase & new_region_data); + static RegionDataMemDiff deserialize(ReadBuffer & buf, RegionCFDataBase & new_region_data); const Data & getData() const; Data & getDataMut(); private: + static const TiKVKey & getTiKVKey(const Value & val); + static const TiKVValue & getTiKVValue(const Value & val); + static bool shouldIgnoreRemove(const Value & value); - RegionDataRes insert(std::pair && kv_pair, DupCheck mode = DupCheck::Deny); private: Data data; diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h index 0b57649622d..7040ed15c51 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h @@ -22,16 +22,6 @@ namespace DB { -struct CFKeyHasher -{ - size_t operator()(const std::pair & k) const noexcept - { - const static Timestamp mask = std::numeric_limits::max() << 40 >> 40; - size_t res = k.first << 24 | (k.second & mask); - return res; - } -}; - struct RegionWriteCFDataTrait { using DecodedWriteCFValue = RecordKVFormat::InnerDecodedWriteCFValue; diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp index 96ad9fa68ad..d661d2bf8b0 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp @@ -28,65 +28,60 @@ extern const int LOGICAL_ERROR; extern const int ILLFORMAT_RAFT_ROW; } // namespace ErrorCodes -void RegionData::reportAlloc(size_t delta) +void RegionData::recordMemChange(const RegionDataMemDiff & delta) { - root_of_kvstore_mem_trackers->alloc(delta, false); + if (delta.payload > 0) + { + root_of_kvstore_mem_trackers->alloc(delta.payload, false); + } + else if (delta.payload < 0) + { + root_of_kvstore_mem_trackers->free(-delta.payload); + } } -void RegionData::reportDealloc(size_t delta) +void RegionData::updateMemoryUsage(const RegionDataMemDiff & delta) { - root_of_kvstore_mem_trackers->free(delta); + cf_data_size += delta.payload; + decoded_data_size += delta.decoded; } -void RegionData::reportDelta(size_t prev, size_t current) +void RegionData::resetMemoryUsage() { - if (current >= prev) - { - root_of_kvstore_mem_trackers->alloc(current - prev, false); - } - else - { - root_of_kvstore_mem_trackers->free(prev - current); - } + cf_data_size = 0; + decoded_data_size = 0; } -RegionDataRes RegionData::insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value, DupCheck mode) +RegionDataMemDiff RegionData::insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value, DupCheck mode) { + RegionDataMemDiff delta; switch (cf) { case ColumnFamilyType::Write: { - auto delta = write_cf.insert(std::move(key), std::move(value), mode); - cf_data_size += delta; - reportAlloc(delta); - return delta; + delta = write_cf.insert(std::move(key), std::move(value), mode); + break; } case ColumnFamilyType::Default: { - auto delta = default_cf.insert(std::move(key), std::move(value), mode); - cf_data_size += delta; - reportAlloc(delta); - return delta; + delta = default_cf.insert(std::move(key), std::move(value), mode); + break; } case ColumnFamilyType::Lock: { - auto delta = lock_cf.insert(std::move(key), std::move(value), mode); - cf_data_size += delta; - if likely (delta >= 0) - { - reportAlloc(delta); - } - else - { - reportDealloc(-delta); - } - return 0; + // By inserting a lock, a old lock of the same key could be replaced. For example, pessimistic lock -> optimistic lock. + delta = lock_cf.insert(std::move(key), std::move(value), mode); + break; } } + recordMemChange(delta); + updateMemoryUsage(delta); + return delta; } void RegionData::remove(ColumnFamilyType cf, const TiKVKey & key) { + RegionDataMemDiff delta; switch (cf) { case ColumnFamilyType::Write: @@ -95,10 +90,8 @@ void RegionData::remove(ColumnFamilyType cf, const TiKVKey & key) auto pk = RecordKVFormat::getRawTiDBPK(raw_key); Timestamp ts = RecordKVFormat::getTs(key); // removed by gc, may not exist. - auto delta = write_cf.remove(RegionWriteCFData::Key{pk, ts}, true); - cf_data_size -= delta; - reportDealloc(delta); - return; + delta = write_cf.remove(RegionWriteCFData::Key{pk, ts}, true); + break; } case ColumnFamilyType::Default: { @@ -106,20 +99,17 @@ void RegionData::remove(ColumnFamilyType cf, const TiKVKey & key) auto pk = RecordKVFormat::getRawTiDBPK(raw_key); Timestamp ts = RecordKVFormat::getTs(key); // removed by gc, may not exist. - auto delta = default_cf.remove(RegionDefaultCFData::Key{pk, ts}, true); - cf_data_size -= delta; - reportDealloc(delta); - return; + delta = default_cf.remove(RegionDefaultCFData::Key{pk, ts}, true); + break; } case ColumnFamilyType::Lock: { - auto delta - = lock_cf.remove(RegionLockCFDataTrait::Key{nullptr, std::string_view(key.data(), key.dataSize())}, true); - cf_data_size -= delta; - reportDealloc(delta); - return; + delta = lock_cf.remove(RegionLockCFDataTrait::Key{nullptr, std::string_view(key.data(), key.dataSize())}, true); + break; } } + recordMemChange(delta); + updateMemoryUsage(delta); } RegionData::WriteCFIter RegionData::removeDataByWriteIt(const WriteCFIter & write_it) @@ -131,22 +121,22 @@ RegionData::WriteCFIter RegionData::removeDataByWriteIt(const WriteCFIter & writ std::ignore = value; std::ignore = key; + RegionDataMemDiff delta; + if (decoded_val.write_type == RecordKVFormat::CFModifyFlag::PutFlag) { auto & map = default_cf.getDataMut(); if (auto data_it = map.find({pk, decoded_val.prewrite_ts}); data_it != map.end()) { - auto delta = RegionDefaultCFData::calcTiKVKeyValueSize(data_it->second); - cf_data_size -= delta; + delta.sub(RegionDefaultCFData::calcTotalKVSize(data_it->second)); map.erase(data_it); - reportDealloc(delta); } } - auto delta = RegionWriteCFData::calcTiKVKeyValueSize(write_it->second); - cf_data_size -= delta; - reportDealloc(delta); + delta.sub(RegionWriteCFData::calcTotalKVSize(write_it->second)); + recordMemChange(delta); + updateMemoryUsage(delta); return write_cf.getDataMut().erase(write_it); } @@ -258,23 +248,24 @@ std::shared_ptr RegionData::getLockByKey(const TiKVKey & key) c void RegionData::splitInto(const RegionRange & range, RegionData & new_region_data) { - size_t size_changed = 0; - size_changed += default_cf.splitInto(range, new_region_data.default_cf); - size_changed += write_cf.splitInto(range, new_region_data.write_cf); - // reportAlloc: Remember to track memory here if we have a region-wise metrics later. - size_changed += lock_cf.splitInto(range, new_region_data.lock_cf); - cf_data_size -= size_changed; - new_region_data.cf_data_size += size_changed; + RegionDataMemDiff size_changed; + size_changed.add(default_cf.splitInto(range, new_region_data.default_cf)); + size_changed.add(write_cf.splitInto(range, new_region_data.write_cf)); + // recordMemChange: Remember to track memory here if we have a region-wise metrics later. + size_changed.add(lock_cf.splitInto(range, new_region_data.lock_cf)); + updateMemoryUsage(size_changed); + new_region_data.updateMemoryUsage(size_changed.negative()); } void RegionData::mergeFrom(const RegionData & ori_region_data) { - size_t size_changed = 0; - size_changed += default_cf.mergeFrom(ori_region_data.default_cf); - size_changed += write_cf.mergeFrom(ori_region_data.write_cf); - // reportAlloc: Remember to track memory here if we have a region-wise metrics later. - size_changed += lock_cf.mergeFrom(ori_region_data.lock_cf); - cf_data_size += size_changed; + RegionDataMemDiff size_changed; + size_changed.add(default_cf.mergeFrom(ori_region_data.default_cf)); + size_changed.add(write_cf.mergeFrom(ori_region_data.write_cf)); + // recordMemChange: Remember to track memory here if we have a region-wise metrics later. + size_changed.add(lock_cf.mergeFrom(ori_region_data.lock_cf)); + updateMemoryUsage(size_changed); + // `mergeFrom` won't delete from source region. So we don't update it here. } size_t RegionData::dataSize() const @@ -282,14 +273,23 @@ size_t RegionData::dataSize() const return cf_data_size; } -void RegionData::assignRegionData(RegionData && new_region_data) +size_t RegionData::totalSize() const +{ + return cf_data_size + decoded_data_size; +} + +void RegionData::assignRegionData(RegionData && rhs) { - default_cf = std::move(new_region_data.default_cf); - write_cf = std::move(new_region_data.write_cf); - lock_cf = std::move(new_region_data.lock_cf); - orphan_keys_info = std::move(new_region_data.orphan_keys_info); + recordMemChange(RegionDataMemDiff{-cf_data_size, -decoded_data_size}); + resetMemoryUsage(); - cf_data_size = new_region_data.cf_data_size.load(); + default_cf = std::move(rhs.default_cf); + write_cf = std::move(rhs.write_cf); + lock_cf = std::move(rhs.lock_cf); + orphan_keys_info = std::move(rhs.orphan_keys_info); + + updateMemoryUsage(RegionDataMemDiff{rhs.cf_data_size, rhs.decoded_data_size}); + rhs.resetMemoryUsage(); } size_t RegionData::serialize(WriteBuffer & buf) const @@ -305,19 +305,21 @@ size_t RegionData::serialize(WriteBuffer & buf) const void RegionData::deserialize(ReadBuffer & buf, RegionData & region_data) { - size_t total_size = 0; - total_size += RegionDefaultCFData::deserialize(buf, region_data.default_cf); - total_size += RegionWriteCFData::deserialize(buf, region_data.write_cf); - total_size += RegionLockCFData::deserialize(buf, region_data.lock_cf); + RegionDataMemDiff size_changed; - region_data.cf_data_size = total_size; - reportAlloc(total_size); + size_changed.add(RegionDefaultCFData::deserialize(buf, region_data.default_cf)); + size_changed.add(RegionWriteCFData::deserialize(buf, region_data.write_cf)); + size_changed.add(RegionLockCFData::deserialize(buf, region_data.lock_cf)); + + region_data.updateMemoryUsage(size_changed); + region_data.recordMemChange(size_changed); } RegionWriteCFData & RegionData::writeCF() { return write_cf; } + RegionDefaultCFData & RegionData::defaultCF() { return default_cf; @@ -327,10 +329,12 @@ const RegionWriteCFData & RegionData::writeCF() const { return write_cf; } + const RegionDefaultCFData & RegionData::defaultCF() const { return default_cf; } + const RegionLockCFData & RegionData::lockCF() const { return lock_cf; @@ -342,33 +346,59 @@ bool RegionData::isEqual(const RegionData & r2) const && cf_data_size == r2.cf_data_size; } -RegionData::RegionData(RegionData && data) +RegionData::RegionData(RegionData && data) noexcept : write_cf(std::move(data.write_cf)) , default_cf(std::move(data.default_cf)) , lock_cf(std::move(data.lock_cf)) , cf_data_size(data.cf_data_size.load()) + , decoded_data_size(data.decoded_data_size.load()) {} - RegionData::~RegionData() { - reportDealloc(cf_data_size); - cf_data_size = 0; + recordMemChange(RegionDataMemDiff{-cf_data_size, 0}); + updateMemoryUsage(RegionDataMemDiff{-cf_data_size, 0}); } -RegionData & RegionData::operator=(RegionData && rhs) +String RegionData::summary() const { - write_cf = std::move(rhs.write_cf); - default_cf = std::move(rhs.default_cf); - lock_cf = std::move(rhs.lock_cf); - reportDelta(cf_data_size, rhs.cf_data_size.load()); - cf_data_size = rhs.cf_data_size.load(); - return *this; + return fmt::format("write:{},lock:{},default:{}", write_cf.getSize(), lock_cf.getSize(), default_cf.getSize()); } -String RegionData::summary() const +size_t RegionData::tryCompactionFilter(Timestamp safe_point) { - return fmt::format("write:{},lock:{},default:{}", write_cf.getSize(), lock_cf.getSize(), default_cf.getSize()); + RegionDataMemDiff delta; + size_t del_write = 0; + auto & write_map = write_cf.getDataMut(); + auto & default_map = default_cf.getDataMut(); + for (auto write_map_it = write_map.begin(); write_map_it != write_map.end();) + { + const auto & decoded_val = std::get<2>(write_map_it->second); + const auto & [pk, ts] = write_map_it->first; + + if (decoded_val.write_type == RecordKVFormat::CFModifyFlag::PutFlag) + { + if (!decoded_val.short_value) + { + if (auto data_it = default_map.find({pk, decoded_val.prewrite_ts}); data_it == default_map.end()) + { + // if key-val in write cf can not find matched data in default cf and its commit-ts < gc-safe-point, we can clean it safely. + if (ts < safe_point) + { + del_write += 1; + delta.sub(RegionWriteCFData::calcTotalKVSize(write_map_it->second)); + write_map_it = write_map.erase(write_map_it); + continue; + } + } + } + } + ++write_map_it; + } + recordMemChange(delta); + updateMemoryUsage(delta); + // No need to check default cf. Because tikv will gc default cf before write cf. + return del_write; } } // namespace DB diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionData.h b/dbms/src/Storages/KVStore/MultiRaft/RegionData.h index ec68df3f82b..86f3785096c 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionData.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionData.h @@ -38,15 +38,10 @@ class RegionData using ConstWriteCFIter = RegionWriteCFData::Map::const_iterator; using LockInfoPtr = std::unique_ptr; - static void reportAlloc(size_t delta); - static void reportDealloc(size_t delta); - static void reportDelta(size_t prev, size_t current); - - RegionDataRes insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny); + RegionDataMemDiff insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny); void remove(ColumnFamilyType cf, const TiKVKey & key); WriteCFIter removeDataByWriteIt(const WriteCFIter & write_it); - std::optional readDataByWriteIt( const ConstWriteCFIter & write_it, bool need_value, @@ -55,22 +50,21 @@ class RegionData bool hard_error); LockInfoPtr getLockInfo(const RegionLockReadQuery & query) const; - std::shared_ptr getLockByKey(const TiKVKey & key) const; void splitInto(const RegionRange & range, RegionData & new_region_data); void mergeFrom(const RegionData & ori_region_data); + // Payload size in RegionData, show how much data flows in/out of the Region. size_t dataSize() const; - - void assignRegionData(RegionData && new_region_data); + // Reflects most of bytes of memory currently occupied by this object. + // It is `dataSize()` and the decoded data cached. + size_t totalSize() const; size_t serialize(WriteBuffer & buf) const; - static void deserialize(ReadBuffer & buf, RegionData & region_data); friend bool operator==(const RegionData & r1, const RegionData & r2) { return r1.isEqual(r2); } - bool isEqual(const RegionData & r2) const; RegionWriteCFData & writeCF(); @@ -83,10 +77,13 @@ class RegionData RegionData() = default; ~RegionData(); - RegionData(RegionData && data); - RegionData & operator=(RegionData &&); + RegionData(RegionData && data) noexcept; + RegionData & operator=(RegionData &&) = delete; // explicit use `assignRegionData` instead + void assignRegionData(RegionData && rhs); String summary() const; + size_t tryCompactionFilter(Timestamp safe_point); + struct OrphanKeysInfo { // Protected by region task lock. @@ -121,6 +118,13 @@ class RegionData std::unordered_set remained_keys; }; +private: + // The memory difference to the KVStore. + static void recordMemChange(const RegionDataMemDiff &); + // The memory difference to this Region. + void updateMemoryUsage(const RegionDataMemDiff &); + void resetMemoryUsage(); + private: friend class Region; @@ -130,8 +134,10 @@ class RegionData RegionLockCFData lock_cf; OrphanKeysInfo orphan_keys_info; - // Size of data cf & write cf, with lock cf. + // Size of 3 cfs, reflects size of real payload flows to KVStore. std::atomic cf_data_size = 0; + // Size of decoded structures for convenient access, considered as amplification in memory. + std::atomic decoded_data_size = 0; }; } // namespace DB diff --git a/dbms/src/Storages/KVStore/Region.cpp b/dbms/src/Storages/KVStore/Region.cpp index 0d27195f2a1..bfbcbab45ee 100644 --- a/dbms/src/Storages/KVStore/Region.cpp +++ b/dbms/src/Storages/KVStore/Region.cpp @@ -67,7 +67,7 @@ void Region::insert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, D doInsert(type, std::move(key), std::move(value), mode); } -RegionDataRes Region::doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode) +RegionDataMemDiff Region::doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode) { if unlikely (getClusterRaftstoreVer() == RaftstoreVer::V2) { @@ -77,12 +77,11 @@ RegionDataRes Region::doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue { // We can't assert the key exists in write_cf here, // since it may be already written into DeltaTree. - return 0; + return RegionDataMemDiff{}; } } } - auto ans = data.insert(type, std::move(key), std::move(value), mode); - return ans; + return data.insert(type, std::move(key), std::move(value), mode); } void Region::remove(const std::string & cf, const TiKVKey & key) @@ -99,7 +98,7 @@ void Region::doRemove(ColumnFamilyType type, const TiKVKey & key) void Region::clearAllData() { std::unique_lock lock(mutex); - data = RegionData(); + data.assignRegionData(RegionData()); } UInt64 Region::appliedIndex() const @@ -294,35 +293,7 @@ void Region::assignRegion(Region && new_region) /// try to clean illegal data because of feature `compaction filter` void Region::tryCompactionFilter(const Timestamp safe_point) { - size_t del_write = 0; - auto & write_map = data.writeCF().getDataMut(); - auto & default_map = data.defaultCF().getDataMut(); - for (auto write_map_it = write_map.begin(); write_map_it != write_map.end();) - { - const auto & decoded_val = std::get<2>(write_map_it->second); - const auto & [pk, ts] = write_map_it->first; - - if (decoded_val.write_type == RecordKVFormat::CFModifyFlag::PutFlag) - { - if (!decoded_val.short_value) - { - if (auto data_it = default_map.find({pk, decoded_val.prewrite_ts}); data_it == default_map.end()) - { - // if key-val in write cf can not find matched data in default cf and its commit-ts < gc-safe-point, we can clean it safely. - if (ts < safe_point) - { - del_write += 1; - data.cf_data_size -= RegionWriteCFData::calcTiKVKeyValueSize(write_map_it->second); - write_map_it = write_map.erase(write_map_it); - continue; - } - } - } - } - ++write_map_it; - } - // No need to check default cf. Because tikv will gc default cf before write cf. - if (del_write) + if (size_t del_write = data.tryCompactionFilter(safe_point); del_write) { LOG_INFO(log, "delete {} records in write cf for region_id={}", del_write, meta.regionId()); } diff --git a/dbms/src/Storages/KVStore/Region.h b/dbms/src/Storages/KVStore/Region.h index db766a2c413..da5ed6166d9 100644 --- a/dbms/src/Storages/KVStore/Region.h +++ b/dbms/src/Storages/KVStore/Region.h @@ -163,6 +163,7 @@ class Region : public std::enable_shared_from_this bool isMerging() const; void setStateApplying(); + // Payload size in RegionData, show how much data flows in/out of the Region. size_t dataSize() const; size_t writeCFCount() const; std::string dataInfo() const; @@ -280,7 +281,7 @@ class Region : public std::enable_shared_from_this // Private methods no need to lock mutex, normally // Returns the size of data change(inc or dec) - RegionDataRes doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode); + RegionDataMemDiff doInsert(ColumnFamilyType type, TiKVKey && key, TiKVValue && value, DupCheck mode); void doRemove(ColumnFamilyType type, const TiKVKey & key); std::optional readDataByWriteIt( diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp index abb4da11afb..92f7bf65001 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.cpp @@ -204,6 +204,11 @@ bool DecodedLockCFValue::isLargeTxn() const return inner == nullptr; } +size_t DecodedLockCFValue::getSize() const +{ + return sizeof(DecodedLockCFValue) + (isLargeTxn() ? 0 : sizeof(DecodedLockCFValue::Inner)); +} + void DecodedLockCFValue::Inner::intoLockInfo(const std::shared_ptr & key, kvrpcpb::LockInfo & res) const { res.set_lock_type(lock_type); diff --git a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h index 508f7a45081..0b641a5ce3e 100644 --- a/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h +++ b/dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h @@ -52,6 +52,7 @@ struct DecodedLockCFValue : boost::noncopyable void withInner(std::function && f) const; /// Return LockInfoPtr if the `query` could be blocked by this lock. Otherwise return nullptr. LockInfoPtr getLockInfoPtr(const RegionLockReadQuery & query) const; + size_t getSize() const; std::shared_ptr key; std::shared_ptr val; diff --git a/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp b/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp index 63febc20ec1..69032ad1c22 100644 --- a/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp +++ b/dbms/src/Storages/KVStore/tests/bench_parse_lock.cpp @@ -27,71 +27,6 @@ using DB::RecordKVFormat::DecodedLockCFValue; DecodedLockCFValue::Inner * decodeLockCfValue(const DecodedLockCFValue & decoded); -TiKVValue encode_lock_cf_value( - UInt8 lock_type, - const String & primary, - Timestamp ts, - UInt64 ttl, - const String * short_value, - Timestamp min_commit_ts, - Timestamp for_update_ts, - uint64_t txn_size, - const std::vector & async_commit, - const std::vector & rollback, - UInt64 generation = 0) -{ - auto lock_value = RecordKVFormat::encodeLockCfValue(lock_type, primary, ts, ttl, short_value, min_commit_ts); - WriteBufferFromOwnString res; - res.write(lock_value.getStr().data(), lock_value.getStr().size()); - { - res.write(RecordKVFormat::MIN_COMMIT_TS_PREFIX); - RecordKVFormat::encodeUInt64(min_commit_ts, res); - } - { - res.write(RecordKVFormat::FOR_UPDATE_TS_PREFIX); - RecordKVFormat::encodeUInt64(for_update_ts, res); - } - { - res.write(RecordKVFormat::TXN_SIZE_PREFIX); - RecordKVFormat::encodeUInt64(txn_size, res); - } - { - res.write(RecordKVFormat::ROLLBACK_TS_PREFIX); - TiKV::writeVarUInt(rollback.size(), res); - for (auto ts : rollback) - { - RecordKVFormat::encodeUInt64(ts, res); - } - } - { - res.write(RecordKVFormat::ASYNC_COMMIT_PREFIX); - TiKV::writeVarUInt(async_commit.size(), res); - for (const auto & s : async_commit) - { - writeVarInt(s.size(), res); - res.write(s.data(), s.size()); - } - } - { - res.write(RecordKVFormat::LAST_CHANGE_PREFIX); - RecordKVFormat::encodeUInt64(12345678, res); - TiKV::writeVarUInt(87654321, res); - } - { - res.write(RecordKVFormat::TXN_SOURCE_PREFIX_FOR_LOCK); - TiKV::writeVarUInt(876543, res); - } - { - res.write(RecordKVFormat::PESSIMISTIC_LOCK_WITH_CONFLICT_PREFIX); - } - if (generation > 0) - { - // res.write(RecordKVFormat::GENERATION_PREFIX); - // RecordKVFormat::encodeUInt64(generation, res); - } - return TiKVValue(res.releaseStr()); -} - void parseTest(benchmark::State & state) { try @@ -100,7 +35,7 @@ void parseTest(benchmark::State & state) auto lock_for_update_ts = 7777, txn_size = 1; const std::vector & async_commit = {"s1", "s2"}; const std::vector & rollback = {3, 4}; - auto lock_value2 = encode_lock_cf_value( + auto lock_value2 = encodeFullLockCfValue( Region::DelFlag, "primary key", 421321, diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp index 0b8ebddc627..5264700fb78 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp @@ -608,7 +608,7 @@ void RegionKVStoreOldTest::testRaftMerge(Context & ctx, KVStore & kvs, TMTContex TEST_F(RegionKVStoreOldTest, RegionReadWrite) { - auto ctx = TiFlashTestEnv::getGlobalContext(); + auto & ctx = TiFlashTestEnv::getGlobalContext(); TableID table_id = 100; KVStore & kvs = getKVS(); UInt64 region_id = 1; diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index b9ac4e1eba5..4ad8ee27ff2 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -35,6 +36,7 @@ extern std::shared_ptr root_of_kvstore_mem_trackers; namespace DB::tests { +using namespace DB::RecordKVFormat; TEST_F(RegionKVStoreTest, RegionStruct) try @@ -84,14 +86,17 @@ try auto str_lock_value = RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 111, 999).toString(); MockRaftStoreProxy::FailCond cond; + const auto decoded_lock_size = sizeof(DecodedLockCFValue) + sizeof(DecodedLockCFValue::Inner); proxy_instance->debugAddRegions( kvs, ctx.getTMTContext(), - {1, 2}, + {1, 2, 3}, {{RecordKVFormat::genKey(table_id, 0), RecordKVFormat::genKey(table_id, 10)}, - {RecordKVFormat::genKey(table_id, 11), RecordKVFormat::genKey(table_id, 20)}}); + {RecordKVFormat::genKey(table_id, 11), RecordKVFormat::genKey(table_id, 20)}, + {RecordKVFormat::genKey(table_id, 21), RecordKVFormat::genKey(table_id, 30)}}); { + // default auto region_id = 1; auto kvr1 = kvs.getRegion(region_id); auto [index, term] @@ -100,38 +105,98 @@ try UNUSED(term); proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); + ASSERT_EQ(kvr1->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(kvr1->dataSize(), kvr1->getData().totalSize()); + } + { + // lock + root_of_kvstore_mem_trackers->reset(); + auto region_id = 2; + auto kvr1 = kvs.getRegion(region_id); + auto [index, term] + = proxy_instance + ->rawWrite(region_id, {str_key}, {str_lock_value}, {WriteCmdType::Put}, {ColumnFamilyType::Lock}); + UNUSED(term); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_lock_value.size()); + ASSERT_EQ(kvr1->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(kvr1->dataSize() + decoded_lock_size, kvr1->getData().totalSize()); } - { + // lock with largetxn + root_of_kvstore_mem_trackers->reset(); + auto region_id = 3; + auto kvr1 = kvs.getRegion(region_id); + ASSERT_NE(kvr1, nullptr); + std::string shor_value = "value"; + auto lock_for_update_ts = 7777, txn_size = 1; + const std::vector & async_commit = {"s1", "s2"}; + const std::vector & rollback = {3, 4}; + auto lock_value2 = DB::RegionBench::encodeFullLockCfValue( + Region::DelFlag, + "primary key", + 421321, + std::numeric_limits::max(), + &shor_value, + 66666, + lock_for_update_ts, + txn_size, + async_commit, + rollback, + 1111); + auto [index, term] + = proxy_instance + ->rawWrite(region_id, {str_key}, {str_lock_value}, {WriteCmdType::Put}, {ColumnFamilyType::Lock}); + UNUSED(term); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_lock_value.size()); + ASSERT_EQ(kvr1->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(kvr1->dataSize() + decoded_lock_size, kvr1->getData().totalSize()); + } + { + // insert & remove root_of_kvstore_mem_trackers->reset(); RegionPtr region = tests::makeRegion(700, start, end, proxy_helper.get()); region->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); region->remove("default", TiKVKey::copyFrom(str_key)); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); + ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); } ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); { + // insert root_of_kvstore_mem_trackers->reset(); RegionPtr region = tests::makeRegion(701, start, end, proxy_helper.get()); region->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); + ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); } ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); { + // reload + proxy_instance->debugAddRegions( + kvs, + ctx.getTMTContext(), + {702}, + {{RecordKVFormat::genKey(table_id, 7020), RecordKVFormat::genKey(table_id, 7030)}}); root_of_kvstore_mem_trackers->reset(); - RegionPtr region = tests::makeRegion(702, start, end, proxy_helper.get()); + RegionPtr region = kvs.getRegion(702); region->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); - // 702 is not registed, so we persist as 1. - tryPersistRegion(kvs, 1); + tryPersistRegion(kvs, 702); root_of_kvstore_mem_trackers->reset(); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); reloadKVSFromDisk(false); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); + ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); } ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); { + // commit root_of_kvstore_mem_trackers->reset(); RegionPtr region = tests::makeRegion(800, start, end, proxy_helper.get()); region->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); @@ -142,17 +207,21 @@ try ASSERT_EQ(1, data_list_read->size()); RemoveRegionCommitCache(region, *data_list_read); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); + ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); } ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); { + // split & merge root_of_kvstore_mem_trackers->reset(); RegionPtr region = tests::makeRegion(900, start, end, proxy_helper.get()); region->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); - auto str_key2 = RecordKVFormat::genKey(table_id, 20, 111); + auto str_key2 = RecordKVFormat::genKey(table_id, 80, 111); auto [str_val_write2, str_val_default2] = proxy_instance->generateTiKVKeyValue(111, 999); region->insert("default", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_val_default2)); auto expected = str_key.dataSize() + str_val_default.size() + str_key2.dataSize() + str_val_default2.size(); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + ASSERT_EQ(region->dataSize(), expected); auto new_region = splitRegion( region, RegionMeta( @@ -160,16 +229,24 @@ try createRegionInfo(902, RecordKVFormat::genKey(table_id, 50), end), initialApplyState())); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + ASSERT_EQ(region->dataSize(), str_key.dataSize() + str_val_default.size()); + ASSERT_EQ(new_region->dataSize(), str_key2.dataSize() + str_val_default2.size()); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); + ASSERT_EQ(new_region->dataSize(), new_region->getData().totalSize()); region->mergeDataFrom(*new_region); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + ASSERT_EQ(region->dataSize(), expected); + ASSERT_EQ(region->dataSize(), region->getData().totalSize()); + ASSERT_EQ(new_region->dataSize(), new_region->getData().totalSize()); } { + // split & merge with lock root_of_kvstore_mem_trackers->reset(); RegionPtr region = tests::makeRegion(1000, start, end, proxy_helper.get()); region->insert("lock", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_lock_value)); auto expected = str_key.dataSize() + str_lock_value.size(); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); - auto str_key2 = RecordKVFormat::genKey(table_id, 20, 111); + auto str_key2 = RecordKVFormat::genKey(table_id, 80, 111); std::string short_value(97, 'a'); auto str_lock_value2 = RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 20, 111, &short_value) @@ -184,8 +261,16 @@ try createRegionInfo(1002, RecordKVFormat::genKey(table_id, 50), end), initialApplyState())); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + ASSERT_EQ(region->dataSize(), str_key.dataSize() + str_lock_value.size()); + ASSERT_EQ(region->getData().totalSize(), region->dataSize() + decoded_lock_size); + ASSERT_EQ(new_region->dataSize(), str_key2.dataSize() + str_lock_value2.size()); + ASSERT_EQ(new_region->getData().totalSize(), new_region->dataSize() + decoded_lock_size); region->mergeDataFrom(*new_region); ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + ASSERT_EQ(region->dataSize(), expected); + ASSERT_EQ(region->getData().totalSize(), region->dataSize() + 2 * decoded_lock_size); + + // replace a lock region->insert("lock", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_lock_value2)); auto str_lock_value2_2 = RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 20, 111).toString(); @@ -193,6 +278,65 @@ try expected -= short_value.size(); expected -= 2; // Short value prefix and length ASSERT_EQ(root_of_kvstore_mem_trackers->get(), expected); + ASSERT_EQ(region->dataSize(), expected); + ASSERT_EQ(region->getData().totalSize(), region->dataSize() + 2 * decoded_lock_size); + } + { + // insert & snapshot + UInt64 region_id = 1100; + root_of_kvstore_mem_trackers->reset(); + proxy_instance->debugAddRegions( + kvs, + ctx.getTMTContext(), + {region_id}, + {{{RecordKVFormat::genKey(table_id, 1100), RecordKVFormat::genKey(table_id, 1200)}}}); + + RegionPtr region = kvs.getRegion(region_id); + ASSERT_NE(region, nullptr); + auto str_key2 = RecordKVFormat::genKey(table_id, 1120, 111); + auto [str_val_write2, str_val_default2] = proxy_instance->generateTiKVKeyValue(111, 999); + + auto str_key3 = RecordKVFormat::genKey(table_id, 1180, 111); + auto [str_val_write3, str_val_default3] = proxy_instance->generateTiKVKeyValue(111, 999); + + region->insert("default", TiKVKey::copyFrom(str_key3), TiKVValue::copyFrom(str_val_default3)); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key3.dataSize() + str_val_default3.size()); + ASSERT_EQ(region->dataSize(), str_key3.dataSize() + str_val_default3.size()); + ASSERT_EQ(region->getData().totalSize(), region->dataSize()); + + MockSSTReader::getMockSSTData().clear(); + MockSSTGenerator default_cf{region_id, table_id, ColumnFamilyType::Default}; + default_cf.insert(1180, str_val_default2); + default_cf.finish_file(); + default_cf.freeze(); + kvs.mutProxyHelperUnsafe()->sst_reader_interfaces = make_mock_sst_reader_interface(); + proxy_instance->snapshot(kvs, ctx.getTMTContext(), region_id, {default_cf}, 0, 0, std::nullopt); + ASSERT_EQ(region->dataSize(), str_key2.dataSize() + str_val_default2.size()); + ASSERT_EQ(region->getData().totalSize(), region->dataSize()); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key2.dataSize() + str_val_default2.size()); + } + { + // assign + root_of_kvstore_mem_trackers->reset(); + RegionPtr region = tests::makeRegion(1200, start, end, proxy_helper.get()); + region->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); + ASSERT_EQ(root_of_kvstore_mem_trackers->get(), str_key.dataSize() + str_val_default.size()); + + auto str_key2 = RecordKVFormat::genKey(table_id, 80, 111); + auto [str_val_write2, str_val_default2] = proxy_instance->generateTiKVKeyValue(111, 999); + RegionPtr region2 = tests::makeRegion(1201, start, end, proxy_helper.get()); + region2->insert("default", TiKVKey::copyFrom(str_key2), TiKVValue::copyFrom(str_val_default2)); + region2->insert("default", TiKVKey::copyFrom(str_key), TiKVValue::copyFrom(str_val_default)); + + region->assignRegion(std::move(*region2)); + ASSERT_EQ( + root_of_kvstore_mem_trackers->get(), + str_key.dataSize() + str_val_default.size() + str_key2.dataSize() + str_val_default2.size()); + ASSERT_EQ(region->dataSize(), root_of_kvstore_mem_trackers->get()); + ASSERT_EQ(region->getData().totalSize(), region->dataSize()); + // `region2` is not allowed to access after move, however, we assert here in order to make sure the logic. + ASSERT_EQ(region2->dataSize(), 0); + ASSERT_EQ(region2->getData().totalSize(), region2->dataSize()); } ASSERT_EQ(root_of_kvstore_mem_trackers->get(), 0); } @@ -1107,6 +1251,12 @@ try auto & pool = ctx.getBackgroundPool(); const auto size = TiFlashTestEnv::DEFAULT_BG_POOL_SIZE; std::atomic_bool b = false; + + JointThreadInfoJeallocMap & jm = *ctx.getJointThreadInfoJeallocMap(); + + size_t original_size + = TiFlashMetrics::instance().getStorageThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "bg"); + auto t = pool.addTask( [&]() { auto * x = new int[1000]; @@ -1123,12 +1273,11 @@ try 5 * 60 * 1000); std::this_thread::sleep_for(500ms); - JointThreadInfoJeallocMap & jm = *ctx.getJointThreadInfoJeallocMap(); jm.recordThreadAllocInfo(); LOG_INFO(DB::Logger::get(), "bg pool size={}", size); UInt64 r = TiFlashMetrics::instance().getStorageThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "bg"); - ASSERT_GE(r, sizeof(int) * 1000); + ASSERT_GE(r, original_size + sizeof(int) * 1000); jm.accessStorageMap([size](const JointThreadInfoJeallocMap::AllocMap & m) { // There are some other bg thread pools ASSERT_GE(m.size(), size) << m.size(); diff --git a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp index c43dc3adf29..0663495bbc3 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_tikv_keyvalue.cpp @@ -21,75 +21,6 @@ #include #include -namespace DB -{ -TiKVValue encode_lock_cf_value( - UInt8 lock_type, - const String & primary, - Timestamp ts, - UInt64 ttl, - const String * short_value, - Timestamp min_commit_ts, - Timestamp for_update_ts, - uint64_t txn_size, - const std::vector & async_commit, - const std::vector & rollback, - UInt64 generation = 0) -{ - auto lock_value = RecordKVFormat::encodeLockCfValue(lock_type, primary, ts, ttl, short_value, min_commit_ts); - WriteBufferFromOwnString res; - res.write(lock_value.getStr().data(), lock_value.getStr().size()); - { - res.write(RecordKVFormat::MIN_COMMIT_TS_PREFIX); - RecordKVFormat::encodeUInt64(min_commit_ts, res); - } - { - res.write(RecordKVFormat::FOR_UPDATE_TS_PREFIX); - RecordKVFormat::encodeUInt64(for_update_ts, res); - } - { - res.write(RecordKVFormat::TXN_SIZE_PREFIX); - RecordKVFormat::encodeUInt64(txn_size, res); - } - { - res.write(RecordKVFormat::ROLLBACK_TS_PREFIX); - TiKV::writeVarUInt(rollback.size(), res); - for (auto ts : rollback) - { - RecordKVFormat::encodeUInt64(ts, res); - } - } - { - res.write(RecordKVFormat::ASYNC_COMMIT_PREFIX); - TiKV::writeVarUInt(async_commit.size(), res); - for (const auto & s : async_commit) - { - writeVarInt(s.size(), res); - res.write(s.data(), s.size()); - } - } - { - res.write(RecordKVFormat::LAST_CHANGE_PREFIX); - RecordKVFormat::encodeUInt64(12345678, res); - TiKV::writeVarUInt(87654321, res); - } - { - res.write(RecordKVFormat::TXN_SOURCE_PREFIX_FOR_LOCK); - TiKV::writeVarUInt(876543, res); - } - { - res.write(RecordKVFormat::PESSIMISTIC_LOCK_WITH_CONFLICT_PREFIX); - } - if (generation > 0) - { - res.write(RecordKVFormat::GENERATION_PREFIX); - RecordKVFormat::encodeUInt64(generation, res); - } - return TiKVValue(res.releaseStr()); -} - -} // namespace DB - using namespace DB; using RangeRef = std::pair; @@ -140,7 +71,7 @@ TEST(TiKVKeyValueTest, PortedTests) auto lock_for_update_ts = 7777, txn_size = 1; const std::vector & async_commit = {"s1", "s2"}; const std::vector & rollback = {3, 4}; - auto lock_value = encode_lock_cf_value( + auto lock_value = DB::RegionBench::encodeFullLockCfValue( Region::DelFlag, "primary key", 421321, @@ -242,15 +173,17 @@ TEST(TiKVKeyValueTest, PortedTests) ASSERT_TRUE( d.insert( - RecordKVFormat::genKey(1, 2, 3), - RecordKVFormat::encodeWriteCfValue(Region::PutFlag, 4, "value", true)) + RecordKVFormat::genKey(1, 2, 3), + RecordKVFormat::encodeWriteCfValue(Region::PutFlag, 4, "value", true)) + .payload == 0); ASSERT_TRUE(d.getSize() == 1); ASSERT_TRUE( d.insert( - RecordKVFormat::genKey(1, 2, 3), - RecordKVFormat::encodeWriteCfValue(RecordKVFormat::UselessCFModifyFlag::LockFlag, 4, "value")) + RecordKVFormat::genKey(1, 2, 3), + RecordKVFormat::encodeWriteCfValue(RecordKVFormat::UselessCFModifyFlag::LockFlag, 4, "value")) + .payload == 0); ASSERT_TRUE(d.getSize() == 1); @@ -469,7 +402,7 @@ try auto lock_for_update_ts = 7777, txn_size = 1; const std::vector & async_commit = {"s1", "s2"}; const std::vector & rollback = {3, 4}; - auto lock_value = encode_lock_cf_value( + auto lock_value = DB::RegionBench::encodeFullLockCfValue( Region::DelFlag, "primary key", 421321, @@ -503,7 +436,7 @@ try }); } - auto lock_value2 = encode_lock_cf_value( + auto lock_value2 = DB::RegionBench::encodeFullLockCfValue( Region::DelFlag, "primary key", 421321,