Skip to content

Commit

Permalink
add totalSize
Browse files Browse the repository at this point in the history
Signed-off-by: Calvin Neo <[email protected]>
  • Loading branch information
CalvinNeo committed Jan 17, 2025
1 parent 92abc36 commit 81a3f64
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 22 deletions.
61 changes: 45 additions & 16 deletions dbms/src/Storages/KVStore/Decode/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ namespace FailPoints
extern const char force_set_num_regions_for_table[];
} // namespace FailPoints

void RegionTable::InternalRegion::updateRegionCacheBytes(size_t cache_bytes_)
int64_t RegionTable::InternalRegion::updateRegionCacheBytes(size_t cache_bytes_)
{
auto diff = static_cast<int64_t>(cache_bytes_) - static_cast<int64_t>(cache_bytes);
cache_bytes = cache_bytes_;
return diff;
}

RegionTable::Table & RegionTable::getOrCreateTable(const KeyspaceID keyspace_id, const TableID table_id)
Expand All @@ -64,31 +66,45 @@ RegionTable::Table & RegionTable::getOrCreateTable(const KeyspaceID keyspace_id,
return it->second;
}

bool RegionTable::hasTable(KeyspaceID keyspace_id, TableID table_id)
{
auto ks_table_id = KeyspaceTableID{keyspace_id, table_id};
auto it = tables.find(ks_table_id);
if (it == tables.end())
{
return false;
}
return true;
}


RegionTable::InternalRegion & RegionTable::insertRegion(Table & table, const Region & region)
{
const auto range = region.getRange();
return insertRegion(table, *range, region.id());
return insertRegion(table, *range, region);
}

RegionTable::InternalRegion & RegionTable::insertRegion(
Table & table,
const RegionRangeKeys & region_range_keys,
const RegionID region_id)
const Region & region)
{
auto keyspace_id = region_range_keys.getKeyspaceID();
auto & table_regions = table.regions;
// Insert table mapping.
// todo check if region_range_keys.mapped_table_id == table.table_id ??
auto [it, ok] = table_regions.emplace(region_id, InternalRegion(region_id, region_range_keys.rawKeys()));
auto [it, ok] = table_regions.emplace(region.id(), InternalRegion(region.id(), region_range_keys.rawKeys()));
if (!ok)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}: insert duplicate internal region, region_id={}",
__PRETTY_FUNCTION__,
region_id);
region.id());

// Insert region mapping.
regions[region_id] = KeyspaceTableID{keyspace_id, table.table_id};
regions[region.id()] = KeyspaceTableID{keyspace_id, table.table_id};

table.updateTableCacheBytes(it->second.updateRegionCacheBytes(region.totalSize()));

return it->second;
}
Expand All @@ -110,6 +126,14 @@ RegionTable::InternalRegion & RegionTable::getOrInsertRegion(const Region & regi
return insertRegion(table, region);
}

void RegionTable::updateTableCacheBytes(const Region & region, int64_t diff)
{
auto keyspace_id = region.getKeyspaceID();
auto table_id = region.getMappedTableID();
auto & table = getOrCreateTable(keyspace_id, table_id);
table.updateTableCacheBytes(diff);
}

RegionTable::RegionTable(Context & context_)
: context(&context_)
, log(Logger::get())
Expand Down Expand Up @@ -161,7 +185,7 @@ void RegionTable::updateRegion(const Region & region)
{
std::lock_guard lock(mutex);
auto & internal_region = getOrInsertRegion(region);
internal_region.updateRegionCacheBytes(region.dataSize());
updateTableCacheBytes(region, internal_region.updateRegionCacheBytes(region.totalSize()));
}

namespace
Expand Down Expand Up @@ -313,7 +337,7 @@ RegionDataReadInfoList RegionTable::tryWriteBlockByRegion(const RegionPtr & regi

func_update_region([&](InternalRegion & internal_region) -> bool {
internal_region.pause_flush = false;
internal_region.updateRegionCacheBytes(region->dataSize());
updateTableCacheBytes(*region, internal_region.updateRegionCacheBytes(region->totalSize()));

internal_region.last_flush_time = Clock::now();
return true;
Expand Down Expand Up @@ -385,32 +409,32 @@ void RegionTable::shrinkRegionRange(const Region & region)
std::lock_guard lock(mutex);
auto & internal_region = getOrInsertRegion(region);
internal_region.range_in_table = region.getRange()->rawKeys();
internal_region.updateRegionCacheBytes(region.dataSize());
updateTableCacheBytes(region, internal_region.updateRegionCacheBytes(region.totalSize()));
}

void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeKeys & region_range_keys)
void RegionTable::extendRegionRange(const Region & region, const RegionRangeKeys & region_range_keys)
{
std::lock_guard lock(mutex);

auto keyspace_id = region_range_keys.getKeyspaceID();
auto table_id = region_range_keys.getMappedTableID();
auto new_handle_range = region_range_keys.rawKeys();

if (auto it = regions.find(region_id); it != regions.end())
if (auto it = regions.find(region.id()); it != regions.end())
{
auto ks_tbl_id = KeyspaceTableID{keyspace_id, table_id};
RUNTIME_CHECK_MSG(
ks_tbl_id == it->second,
"{}: table id not match the previous one"
", region_id={} keyspace={} table_id={} old_keyspace={} old_table_id={}",
__PRETTY_FUNCTION__,
region_id,
region.id(),
keyspace_id,
table_id,
it->second.first,
it->second.second);

InternalRegion & internal_region = doGetInternalRegion(ks_tbl_id, region_id);
InternalRegion & internal_region = doGetInternalRegion(ks_tbl_id, region.id());

if (*(internal_region.range_in_table.first) <= *(new_handle_range.first)
&& *(internal_region.range_in_table.second) >= *(new_handle_range.second))
Expand All @@ -420,7 +444,7 @@ void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeK
"internal region has larger range, keyspace={} table_id={} region_id={}",
keyspace_id,
table_id,
region_id);
region.id());
}
else
{
Expand All @@ -435,8 +459,13 @@ void RegionTable::extendRegionRange(const RegionID region_id, const RegionRangeK
else
{
auto & table = getOrCreateTable(keyspace_id, table_id);
insertRegion(table, region_range_keys, region_id);
LOG_INFO(log, "insert internal region, keyspace={} table_id={} region_id={}", keyspace_id, table_id, region_id);
insertRegion(table, region_range_keys, region);
LOG_INFO(
log,
"insert internal region, keyspace={} table_id={} region_id={}",
keyspace_id,
table_id,
region.id());
}
}

Expand Down
16 changes: 12 additions & 4 deletions dbms/src/Storages/KVStore/Decode/RegionTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ class RegionTable : private boost::noncopyable
, range_in_table(range_in_table_)
{}

void updateRegionCacheBytes(size_t);
int64_t updateRegionCacheBytes(size_t);

RegionID region_id;
std::pair<DecodedTiKVKeyPtr, DecodedTiKVKeyPtr> range_in_table;
bool pause_flush = false;
bool pause_flush = false; // TODO Can we remove this?
Timepoint last_flush_time = Clock::now();

private:
Expand All @@ -95,11 +95,18 @@ class RegionTable : private boost::noncopyable
{
explicit Table(const TableID table_id_)
: table_id(table_id_)
, size(0)
{}
void updateTableCacheBytes(UInt64 diff) { size.fetch_add(diff); }
TableID table_id;
InternalRegions regions;

private:
std::atomic_int64_t size;
};

void updateTableCacheBytes(const Region &, int64_t);

explicit RegionTable(Context & context_);
void restore();

Expand All @@ -109,7 +116,7 @@ class RegionTable : private boost::noncopyable
void shrinkRegionRange(const Region & region);

/// extend range for possible InternalRegion or add one.
void extendRegionRange(RegionID region_id, const RegionRangeKeys & region_range_keys);
void extendRegionRange(const Region & region, const RegionRangeKeys & region_range_keys);

void removeRegion(RegionID region_id, bool remove_data, const RegionTaskLock &);

Expand Down Expand Up @@ -151,6 +158,7 @@ class RegionTable : private boost::noncopyable
const LoggerPtr & log);

void clear();
bool hasTable(KeyspaceID keyspace_id, TableID table_id);

public:
// safe ts is maintained by check_leader RPC (https://github.com/tikv/tikv/blob/1ea26a2ac8761af356cc5c0825eb89a0b8fc9749/components/resolved_ts/src/advance.rs#L262),
Expand Down Expand Up @@ -183,7 +191,7 @@ class RegionTable : private boost::noncopyable
Table & getOrCreateTable(KeyspaceID keyspace_id, TableID table_id);
void removeTable(KeyspaceID keyspace_id, TableID table_id);
InternalRegion & getOrInsertRegion(const Region & region);
InternalRegion & insertRegion(Table & table, const RegionRangeKeys & region_range_keys, RegionID region_id);
InternalRegion & insertRegion(Table & table, const RegionRangeKeys & region_range_keys, const Region & region);
InternalRegion & insertRegion(Table & table, const Region & region);
InternalRegion & doGetInternalRegion(KeyspaceTableID ks_table_id, RegionID region_id);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ void KVStore::onSnapshot(
const auto range = new_region_wrap->getRange();
auto & region_table = tmt.getRegionTable();
// extend region to make sure data won't be removed.
region_table.extendRegionRange(region_id, *range);
region_table.extendRegionRange(*new_region_wrap, *range);
}

// Register the new Region.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ RegionID RegionRaftCommandDelegate::execCommitMerge(
: source_region_meta_delegate.regionState().getRegion().end_key();

region_table.extendRegionRange(
id(),
*this,
RegionRangeKeys(TiKVKey::copyFrom(new_start_key), TiKVKey::copyFrom(new_end_key)));
}

Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/KVStore/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ size_t Region::dataSize() const
return data.dataSize();
}

size_t Region::totalSize() const
{
return data.totalSize() + sizeof(RegionMeta);
}


size_t Region::writeCFCount() const
{
std::shared_lock<std::shared_mutex> lock(mutex);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class Region : public std::enable_shared_from_this<Region>

// Payload size in RegionData, show how much data flows in/out of the Region.
size_t dataSize() const;
// How much memory the Region consumes.
size_t totalSize() const;
size_t writeCFCount() const;
std::string dataInfo() const;

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ try
{RecordKVFormat::genKey(table_id, 11), RecordKVFormat::genKey(table_id, 20)},
{RecordKVFormat::genKey(table_id, 21), RecordKVFormat::genKey(table_id, 30)}});

auto & region_table = ctx.getTMTContext().getRegionTable();
ASSERT_TRUE(region_table.hasTable(NullspaceID, table_id));
{
// default
auto region_id = 1;
Expand Down

0 comments on commit 81a3f64

Please sign in to comment.