Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
class slab with index
Browse files Browse the repository at this point in the history
  • Loading branch information
akroviakov committed Oct 11, 2023
1 parent 5917a28 commit 2d1c992
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 207 deletions.
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/Buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
namespace Buffer_Namespace {

Buffer::Buffer(BufferMgr* bm,
BufferList::iterator seg_it,
SegmentList::iterator seg_it,
const int device_id,
const size_t page_size,
const size_t num_bytes)
Expand Down
4 changes: 2 additions & 2 deletions omniscidb/DataMgr/BufferMgr/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Buffer : public AbstractBuffer {
*/

Buffer(BufferMgr* bm,
BufferList::iterator seg_it,
SegmentList::iterator seg_it,
const int device_id,
const size_t page_size = 512,
const size_t num_bytes = 0);
Expand Down Expand Up @@ -178,7 +178,7 @@ class Buffer : public AbstractBuffer {
const int src_device_id = -1) = 0;

BufferMgr* bm_;
BufferList::iterator seg_it_;
SegmentList::iterator seg_it_;
size_t page_size_; /// the size of each page in the buffer
size_t num_pages_;
int epoch_; /// indicates when the buffer was last flushed
Expand Down
226 changes: 102 additions & 124 deletions omniscidb/DataMgr/BufferMgr/BufferMgr.cpp

Large diffs are not rendered by default.

116 changes: 49 additions & 67 deletions omniscidb/DataMgr/BufferMgr/BufferMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,67 +86,51 @@ class TooBigForSlab : public OutOfMemory {
using namespace Data_Namespace;

namespace Buffer_Namespace {

class SlabFreeBufIndex {
public:
SlabFreeBufIndex(BufferList& list_to_index) : list_to_index_(list_to_index) {
for (BufferList::iterator it = list_to_index_.begin(); it != list_to_index_.end();
++it) {
if (it->mem_status == MemStatus::FREE) {
insert(it);
struct Slab {
private:
static bool iterator_page_count_cmp_l(const SegmentList::iterator& lhs,
const size_t num_pages) {
return lhs->num_pages < num_pages;
}
struct SegmentListIterCmp {
bool operator()(const SegmentList::iterator& lhs,
const SegmentList::iterator& rhs) const {
if (lhs->num_pages < rhs->num_pages) {
return true;
}
if (lhs->num_pages > rhs->num_pages) {
return false;
}
return (lhs->start_page < rhs->start_page);
}
CHECK_GT(free_buffs_.size(), 0);
}
};

BufferList::iterator getFreeBuff(const size_t num_requested_pages);
std::set<SegmentList::iterator, SegmentListIterCmp> free_segment_index_;
SegmentList segments_;

void insert(BufferList::iterator& slab_buffer_pos);
public:
Slab(const size_t page_count) : segments_(SegmentList{BufferSeg(0, page_count)}) {
free_segment_index_.insert(segments_.begin());
}

void remove(BufferList::iterator& slab_buffer_pos);
SegmentList::iterator getFreeSegment(const size_t num_requested_pages);

void verify() const;
SegmentList::iterator insert(SegmentList::iterator& slab_buffer_pos,
BufferSeg& data_seg);

const std::vector<BufferList::iterator>& getFreeBuffs() const { return free_buffs_; };
const BufferList& getListToIndex() const { return list_to_index_; };
SegmentList::iterator remove(SegmentList::iterator slab_buffer_pos);

private:
static bool iterator_cmp(const BufferList::iterator& lhs,
const BufferList::iterator& rhs) {
return lhs->num_pages < rhs->num_pages;
}
static bool iterator_page_count_cmp_l(const BufferList::iterator& lhs,
const size_t num_pages) {
return lhs->num_pages < num_pages;
}
static bool iterator_page_count_cmp_u(const size_t num_pages,
const BufferList::iterator& lhs) {
return lhs->num_pages < num_pages;
}
std::vector<BufferList::iterator>::iterator findExact(
BufferList::iterator& slab_buffer_pos) {
auto index_pos = std::lower_bound(
free_buffs_.begin(), free_buffs_.end(), slab_buffer_pos, iterator_cmp);
while (index_pos != free_buffs_.end() && (*index_pos) != slab_buffer_pos &&
(*index_pos)->num_pages == slab_buffer_pos->num_pages) {
index_pos++;
}
CHECK(index_pos != free_buffs_.end());
CHECK((*index_pos) == slab_buffer_pos);
CHECK_EQ((*index_pos)->num_pages, slab_buffer_pos->num_pages);
return index_pos;
}
std::vector<BufferList::iterator> free_buffs_;
BufferList& list_to_index_;
};
void index_insert(SegmentList::iterator& to_insert);
void index_remove(SegmentList::iterator& at);
void verify_index();

struct Slab {
Slab(BufferList& buffers) : buffers_(buffers), free_buffs_index_(buffers_) {}
Slab(BufferList&& buffers)
: buffers_(std::move(buffers)), free_buffs_index_(buffers_) {}
SegmentList::iterator begin() { return segments_.begin(); }
SegmentList::iterator end() { return segments_.end(); }

BufferList buffers_;
SlabFreeBufIndex free_buffs_index_;
const std::set<SegmentList::iterator, SegmentListIterCmp>& getFreeSegments() const {
return free_segment_index_;
};
const SegmentList& getListToIndex() const { return segments_; };
};

struct MemoryData {
Expand Down Expand Up @@ -194,7 +178,7 @@ class BufferMgr : public AbstractBufferMgr { // implements
void clearSlabs();
std::string printMap();
void printSegs();
std::string printSeg(BufferList::iterator& seg_it);
std::string printSeg(SegmentList::iterator& seg_it);
std::string keyToString(const ChunkKey& key);
size_t getInUseSize() override;
size_t getMaxSize() override;
Expand All @@ -203,7 +187,6 @@ class BufferMgr : public AbstractBufferMgr { // implements
size_t getMaxSlabSize();
size_t getPageSize();
bool isAllocationCapped() override;
const std::vector<BufferList> getSlabSegments();

/// Creates a chunk with the specified key and page size.
AbstractBuffer* createBuffer(const ChunkKey& key,
Expand Down Expand Up @@ -255,8 +238,8 @@ class BufferMgr : public AbstractBufferMgr { // implements
size_t size();
size_t getNumChunks() override;

BufferList::iterator reserveBuffer(BufferList::iterator& seg_it,
const size_t num_bytes);
SegmentList::iterator reserveBuffer(SegmentList::iterator& seg_it,
const size_t num_bytes);
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector& chunk_metadata_vec,
const ChunkKey& key_prefix) override;

Expand All @@ -272,18 +255,18 @@ class BufferMgr : public AbstractBufferMgr { // implements
const size_t page_size_;
std::vector<int8_t*> slabs_; /// vector of beginning memory addresses for each
/// allocation of the buffer pool
std::list<Slab> slab_segments_;
std::vector<Slab> slab_segments_;

private:
BufferMgr(const BufferMgr&); // private copy constructor
BufferMgr& operator=(const BufferMgr&); // private assignment
void removeSegment(BufferList::iterator& seg_it);
BufferList::iterator findFreeBufferInSlab(const size_t slab_num,
const size_t num_pages_requested);
void removeSegment(SegmentList::iterator& seg_it);
SegmentList::iterator findFreeSegmentInSlab(const size_t slab_num,
const size_t num_pages_requested);
int getBufferId();
virtual void addSlab(const size_t slab_size) = 0;
virtual void freeAllMem() = 0;
virtual void allocateBuffer(BufferList::iterator seg_it,
virtual void allocateBuffer(SegmentList::iterator seg_it,
const size_t page_size,
const size_t num_bytes) = 0;
virtual AbstractBuffer* allocateZeroCopyBuffer(
Expand All @@ -303,7 +286,7 @@ class BufferMgr : public AbstractBufferMgr { // implements
// to this map should be synced throug chunk_index_mutex_.
std::map<ChunkKey, std::shared_ptr<std::condition_variable>> in_progress_buffer_cvs_;

std::map<ChunkKey, BufferList::iterator> chunk_index_;
std::map<ChunkKey, SegmentList::iterator> chunk_index_;
size_t max_buffer_pool_num_pages_; // max number of pages for buffer pool
size_t num_pages_allocated_;
size_t min_num_pages_per_slab_;
Expand All @@ -314,13 +297,12 @@ class BufferMgr : public AbstractBufferMgr { // implements
int max_buffer_id_;
unsigned int buffer_epoch_;

BufferList unsized_segs_;
SegmentList unsized_segs_;

BufferList::iterator evict(BufferList::iterator& evict_start,
const size_t num_pages_requested,
const int slab_num);
SegmentList::iterator evict(SegmentList::iterator& evict_start,
const size_t num_pages_requested,
const int slab_num);

Slab& getSlab(const size_t slab_num);
/**
* @brief Gets a buffer of required size and returns an iterator to it
*
Expand All @@ -334,7 +316,7 @@ class BufferMgr : public AbstractBufferMgr { // implements
* USED if applicable
*
*/
BufferList::iterator findFreeBuffer(size_t num_bytes);
SegmentList::iterator findFreeSegment(size_t num_bytes);
};

} // namespace Buffer_Namespace
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/BufferSeg.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,5 @@ struct BufferSeg {
, last_touched(last_touched) {}
};

using BufferList = std::list<BufferSeg>;
using SegmentList = std::list<BufferSeg>;
} // namespace Buffer_Namespace
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
namespace Buffer_Namespace {

CpuBuffer::CpuBuffer(BufferMgr* bm,
BufferList::iterator segment_iter,
SegmentList::iterator segment_iter,
const int device_id,
GpuMgr* gpu_mgr,
const size_t page_size,
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace Buffer_Namespace {
class CpuBuffer : public Buffer {
public:
CpuBuffer(BufferMgr* bm,
BufferList::iterator segment_iter,
SegmentList::iterator segment_iter,
const int device_id,
GpuMgr* gpu_mgr,
const size_t page_size = 512,
Expand Down
4 changes: 2 additions & 2 deletions omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBufferMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ void CpuBufferMgr::addSlab(const size_t slab_size) {
slabs_.resize(slabs_.size() - 1);
throw FailedToCreateSlab(slab_size);
}
slab_segments_.emplace_back(BufferList{BufferSeg(0, slab_size / page_size_)});
slab_segments_.emplace_back(slab_size / page_size_);
}

void CpuBufferMgr::freeAllMem() {
CHECK(allocator_);
initializeMem();
}

void CpuBufferMgr::allocateBuffer(BufferList::iterator seg_it,
void CpuBufferMgr::allocateBuffer(SegmentList::iterator seg_it,
const size_t page_size,
const size_t initial_size) {
new CpuBuffer(this,
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBufferMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class CpuBufferMgr : public BufferMgr {
protected:
void addSlab(const size_t slab_size) override;
void freeAllMem() override;
void allocateBuffer(BufferList::iterator segment_iter,
void allocateBuffer(SegmentList::iterator segment_iter,
const size_t page_size,
const size_t initial_size) override;
virtual void initializeMem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void TieredCpuBufferMgr::addSlab(const size_t slab_size) {
}
if (allocated_slab) {
// We allocated a new slab, so add segments for it.
slab_segments_.emplace_back(BufferList{BufferSeg(0, slab_size / page_size_)});
slab_segments_.emplace_back(slab_size / page_size_);
LOG(INFO) << "Allocated slab using " << tier_to_string(last_tier) << ".";
} else {
// None of the allocators allocated a slab, so revert to original size and throw.
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace Buffer_Namespace {

GpuBuffer::GpuBuffer(BufferMgr* bm,
BufferList::iterator seg_it,
SegmentList::iterator seg_it,
const int device_id,
GpuMgr* gpu_mgr,
const size_t page_size,
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Buffer_Namespace {
class GpuBuffer : public Buffer {
public:
GpuBuffer(BufferMgr* bm,
BufferList::iterator seg_it,
SegmentList::iterator seg_it,
const int device_id,
GpuMgr* gpu_mgr,
const size_t page_size = 512,
Expand Down
4 changes: 2 additions & 2 deletions omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBufferMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void GpuBufferMgr::addSlab(const size_t slab_size) {
slabs_.resize(slabs_.size() - 1);
throw FailedToCreateSlab(slab_size);
}
slab_segments_.emplace_back(BufferList{BufferSeg(0, slab_size / page_size_)});
slab_segments_.emplace_back(slab_size / page_size_);
}

void GpuBufferMgr::freeAllMem() {
Expand All @@ -72,7 +72,7 @@ void GpuBufferMgr::freeAllMem() {
}
}

void GpuBufferMgr::allocateBuffer(BufferList::iterator seg_it,
void GpuBufferMgr::allocateBuffer(SegmentList::iterator seg_it,
const size_t page_size,
const size_t initial_size) {
new GpuBuffer(this,
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBufferMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class GpuBufferMgr : public BufferMgr {
private:
void addSlab(const size_t slab_size) override;
void freeAllMem() override;
void allocateBuffer(BufferList::iterator seg_it,
void allocateBuffer(SegmentList::iterator seg_it,
const size_t page_size,
const size_t initial_size) override;
GpuMgr* gpu_mgr_;
Expand Down
4 changes: 3 additions & 1 deletion omniscidb/QueryEngine/RelAlgExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1448,11 +1448,12 @@ ExecutionResult RelAlgExecutor::executeWorkUnit(
}
const auto body = work_unit.body;
CHECK(body);
auto timer1 = DEBUG_TIMER("get_table_infos");
const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
auto timer2 = DEBUG_TIMER("ALL");

auto ra_exe_unit = decide_approx_count_distinct_implementation(
work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);

auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
if (is_window_execution_unit(ra_exe_unit)) {
CHECK_EQ(table_infos.size(), size_t(1));
Expand Down Expand Up @@ -1484,6 +1485,7 @@ ExecutionResult RelAlgExecutor::executeWorkUnit(
eo.output_columnar_hint = true;
}
}
timer2.stop();

ExecutionResult result;
auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
Expand Down

0 comments on commit 2d1c992

Please sign in to comment.