From 2d1c992bc00b470c033cff83981ac4ef919522fc Mon Sep 17 00:00:00 2001 From: Artem Kroviakkov Date: Wed, 11 Oct 2023 03:27:19 -0700 Subject: [PATCH] class slab with index --- omniscidb/DataMgr/BufferMgr/Buffer.cpp | 2 +- omniscidb/DataMgr/BufferMgr/Buffer.h | 4 +- omniscidb/DataMgr/BufferMgr/BufferMgr.cpp | 226 ++++++++---------- omniscidb/DataMgr/BufferMgr/BufferMgr.h | 116 ++++----- omniscidb/DataMgr/BufferMgr/BufferSeg.h | 2 +- .../BufferMgr/CpuBufferMgr/CpuBuffer.cpp | 2 +- .../BufferMgr/CpuBufferMgr/CpuBuffer.h | 2 +- .../BufferMgr/CpuBufferMgr/CpuBufferMgr.cpp | 4 +- .../BufferMgr/CpuBufferMgr/CpuBufferMgr.h | 2 +- .../CpuBufferMgr/TieredCpuBufferMgr.cpp | 2 +- .../BufferMgr/GpuBufferMgr/GpuBuffer.cpp | 2 +- .../BufferMgr/GpuBufferMgr/GpuBuffer.h | 2 +- .../BufferMgr/GpuBufferMgr/GpuBufferMgr.cpp | 4 +- .../BufferMgr/GpuBufferMgr/GpuBufferMgr.h | 2 +- omniscidb/QueryEngine/RelAlgExecutor.cpp | 4 +- 15 files changed, 169 insertions(+), 207 deletions(-) diff --git a/omniscidb/DataMgr/BufferMgr/Buffer.cpp b/omniscidb/DataMgr/BufferMgr/Buffer.cpp index dc804b69ba..5b767b66f1 100644 --- a/omniscidb/DataMgr/BufferMgr/Buffer.cpp +++ b/omniscidb/DataMgr/BufferMgr/Buffer.cpp @@ -25,7 +25,7 @@ namespace Buffer_Namespace { Buffer::Buffer(BufferMgr* bm, - BufferList::iterator seg_it, + SegmentList::iterator seg_it, const int device_id, const size_t page_size, const size_t num_bytes) diff --git a/omniscidb/DataMgr/BufferMgr/Buffer.h b/omniscidb/DataMgr/BufferMgr/Buffer.h index 92a880e2d4..eb834204b8 100644 --- a/omniscidb/DataMgr/BufferMgr/Buffer.h +++ b/omniscidb/DataMgr/BufferMgr/Buffer.h @@ -63,7 +63,7 @@ class Buffer : public AbstractBuffer { */ Buffer(BufferMgr* bm, - BufferList::iterator seg_it, + SegmentList::iterator seg_it, const int device_id, const size_t page_size = 512, const size_t num_bytes = 0); @@ -178,7 +178,7 @@ class Buffer : public AbstractBuffer { const int src_device_id = -1) = 0; BufferMgr* bm_; - BufferList::iterator seg_it_; + SegmentList::iterator seg_it_; size_t page_size_; /// the size of each page in the buffer size_t num_pages_; int epoch_; /// indicates when the buffer was last flushed diff --git a/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp b/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp index 6f5a743d59..eb739bfed0 100644 --- a/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp +++ b/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp @@ -34,68 +34,75 @@ using namespace std; namespace Buffer_Namespace { -BufferList::iterator SlabFreeBufIndex::getFreeBuff(const size_t num_requested_pages) { - if (num_requested_pages > (*std::prev(free_buffs_.end()))->num_pages) { +SegmentList::iterator Slab::getFreeSegment(const size_t num_requested_pages) { + if (num_requested_pages > (*std::prev(free_segment_index_.end()))->num_pages) { // If the largest buffer can't fit, then nothing will. - return list_to_index_.end(); + return segments_.end(); } - auto result = *std::lower_bound(free_buffs_.begin(), - free_buffs_.end(), + auto result = *std::lower_bound(free_segment_index_.begin(), + free_segment_index_.end(), num_requested_pages, iterator_page_count_cmp_l); CHECK_EQ(result->mem_status, MemStatus::FREE); return result; } -void SlabFreeBufIndex::insert(BufferList::iterator& slab_buffer_pos) { - CHECK_EQ(slab_buffer_pos->mem_status, MemStatus::FREE); - auto index_pos = std::lower_bound( - free_buffs_.begin(), free_buffs_.end(), slab_buffer_pos, iterator_cmp); - free_buffs_.insert(index_pos, slab_buffer_pos); +SegmentList::iterator Slab::insert(SegmentList::iterator& at, BufferSeg& data_seg) { + auto my_segments_iter = segments_.insert(at, data_seg); + if (data_seg.mem_status == MemStatus::FREE) { + index_insert(my_segments_iter); + } + return my_segments_iter; } -void SlabFreeBufIndex::remove(BufferList::iterator& slab_buffer_pos) { - CHECK(slab_buffer_pos != list_to_index_.end()); - if (slab_buffer_pos->mem_status != MemStatus::FREE) { - verify(); +SegmentList::iterator Slab::remove(SegmentList::iterator to_erase) { + if (to_erase->mem_status == MemStatus::FREE) { + auto index_iter = free_segment_index_.find(to_erase); + CHECK(index_iter != free_segment_index_.end()); + index_remove(to_erase); } - CHECK_EQ(slab_buffer_pos->mem_status, MemStatus::FREE); - auto index_pos = findExact(slab_buffer_pos); - free_buffs_.erase(index_pos); + return segments_.erase(to_erase); +} + +void Slab::index_insert(SegmentList::iterator& to_insert) { + CHECK(to_insert->mem_status == MemStatus::FREE); + auto [_, success] = free_segment_index_.insert(to_insert); + CHECK(success); +} + +void Slab::index_remove(SegmentList::iterator& to_erase) { + CHECK(to_erase->mem_status == MemStatus::FREE); + auto index_iter = free_segment_index_.find(to_erase); + CHECK(index_iter != free_segment_index_.end()); + free_segment_index_.erase(index_iter); } -void SlabFreeBufIndex::verify() const { +void Slab::verify_index() { bool failed{false}; - for (BufferList::iterator it = list_to_index_.begin(); it != list_to_index_.end(); - ++it) { - if (it->mem_status == MemStatus::FREE) { - auto index_pos = - std::lower_bound(free_buffs_.begin(), free_buffs_.end(), it, iterator_cmp); - if ((index_pos == free_buffs_.end()) || - ((*index_pos)->num_pages != it->num_pages)) { - failed = true; - } - while (index_pos != free_buffs_.end() && (*index_pos) != it && - (*index_pos)->num_pages == it->num_pages) { - index_pos++; - } - if ((index_pos == free_buffs_.end()) || - ((*index_pos)->num_pages != it->num_pages)) { + for (SegmentList::iterator segment_it = segments_.begin(); + segment_it != segments_.end(); + ++segment_it) { + if (segment_it->mem_status == MemStatus::FREE) { + auto index_pos = free_segment_index_.find(segment_it); + if ((index_pos == free_segment_index_.end()) || + ((*index_pos)->num_pages != segment_it->num_pages)) { failed = true; } } } if (failed) { LOG(INFO) << "Slab has:"; - for (BufferList::iterator it = list_to_index_.begin(); it != list_to_index_.end(); - ++it) { - if (it->mem_status == MemStatus::FREE) { - LOG(INFO) << it->start_page << " , " << it->num_pages; + for (SegmentList::iterator segment_it = segments_.begin(); + segment_it != segments_.end(); + ++segment_it) { + if (segment_it->mem_status == MemStatus::FREE) { + LOG(INFO) << "start_page=" << segment_it->start_page + << ", num_pages=" << segment_it->num_pages; } } LOG(INFO) << "Index has:"; - for (auto& x : free_buffs_) { - LOG(INFO) << x->start_page << " , " << x->num_pages; + for (auto& x : free_segment_index_) { + LOG(INFO) << "start_page=" << x->start_page << ", num_pages=" << x->num_pages; } } CHECK(!failed); @@ -193,7 +200,7 @@ AbstractBuffer* BufferMgr::createBuffer(const ChunkKey& chunk_key, } // chunk_page_size is just for recording dirty pages - BufferList::iterator seg_it; + SegmentList::iterator seg_it; { std::lock_guard lock(chunk_index_mutex_); CHECK(chunk_index_.find(chunk_key) == chunk_index_.end()); @@ -232,15 +239,9 @@ AbstractBuffer* BufferMgr::createZeroCopyBuffer( return allocateZeroCopyBuffer(page_size_, std::move(token)); } -Slab& BufferMgr::getSlab(const size_t slab_num) { - auto slab_it = slab_segments_.begin(); - std::advance(slab_it, slab_num); - return *slab_it; -} - -BufferList::iterator BufferMgr::evict(BufferList::iterator& evict_start, - const size_t num_pages_requested, - const int slab_num) { +SegmentList::iterator BufferMgr::evict(SegmentList::iterator& evict_start, + const size_t num_pages_requested, + const int slab_num) { // It is assumed that caller holds a lock on sized_segs_mutex_. // We can assume here that buffer for evictStart either doesn't exist // (evictStart is first buffer) or was not free, so don't need ot merge @@ -248,9 +249,7 @@ BufferList::iterator BufferMgr::evict(BufferList::iterator& evict_start, auto evict_it = evict_start; size_t num_pages = 0; size_t start_page = evict_start->start_page; - auto& slab = getSlab(slab_num); - auto& slab_segs = slab.buffers_; - auto& free_segs_index = slab.free_buffs_index_; + auto& slab = slab_segments_[slab_num]; while (num_pages < num_pages_requested) { if (evict_it->mem_status == USED) { CHECK(evict_it->buffer->getPinCount() < 1); @@ -264,35 +263,34 @@ BufferList::iterator BufferMgr::evict(BufferList::iterator& evict_start, // a memleak. delete evict_it->buffer; } - if (evict_it->mem_status == FREE) { - free_segs_index.remove(evict_it); - } - evict_it = slab_segs.erase( - evict_it); // erase operations returns next iterator - safe if we ever move - // to a vector (as opposed to erase(evict_it++) + evict_it = + slab.remove(evict_it); // erase operations returns next iterator - safe if we + // ever move to a vector (as opposed to erase(evict_it++) } + BufferSeg data_seg( start_page, num_pages_requested, USED, buffer_epoch_++); // until we can // data_seg.pinCount++; data_seg.slab_num = slab_num; - auto data_seg_it = slab_segs.insert(evict_it, data_seg); // Will insert before evict_it + auto data_seg_it = slab.insert(evict_it, data_seg); // Will insert before evict_it if (num_pages_requested < num_pages) { const size_t excess_pages = num_pages - num_pages_requested; - if (evict_it != slab_segs.end() && + if (evict_it != slab.end() && evict_it->mem_status == FREE) { // need to merge with current page + slab.index_remove(evict_it); evict_it->start_page = start_page + num_pages_requested; evict_it->num_pages += excess_pages; + slab.index_insert(evict_it); } else { // need to insert a free seg before evict_it for excess_pages BufferSeg free_seg(start_page + num_pages_requested, excess_pages, FREE); - auto free_seg_it = slab_segs.insert(evict_it, free_seg); - free_segs_index.insert(free_seg_it); + slab.insert(evict_it, free_seg); } } return data_seg_it; } -BufferList::iterator BufferMgr::reserveBuffer( - BufferList::iterator& seg_it, +SegmentList::iterator BufferMgr::reserveBuffer( + SegmentList::iterator& seg_it, const size_t num_bytes) { // assumes buffer is already pinned std::unique_lock sized_segs_lock(sized_segs_mutex_); @@ -306,25 +304,23 @@ BufferList::iterator BufferMgr::reserveBuffer( } // First check for free segment after seg_it if (slab_num >= 0) { // not dummy page - BufferList::iterator next_it = std::next(seg_it); - auto& slab = getSlab(slab_num); - auto& slab_segs = slab.buffers_; - auto& free_segs_index = slab.free_buffs_index_; - if (next_it != slab_segs.end() && next_it->mem_status == FREE && + SegmentList::iterator next_it = std::next(seg_it); + auto& slab = slab_segments_[slab_num]; + if (next_it != slab.end() && next_it->mem_status == FREE && next_it->num_pages >= num_pages_extra_needed) { // Then we can just use the next BufferSeg which happens to be free - const size_t new_num_pages = next_it->num_pages - num_pages_extra_needed; - free_segs_index.remove(next_it); + CHECK_NE(seg_it->mem_status, FREE); + slab.index_remove(next_it); seg_it->num_pages = num_pages_requested; - next_it->num_pages = new_num_pages; + next_it->num_pages = next_it->num_pages - num_pages_extra_needed; next_it->start_page = seg_it->start_page + seg_it->num_pages; - free_segs_index.insert(next_it); + slab.index_insert(next_it); return seg_it; } } // If we're here then we couldn't keep buffer in existing slot // need to find new segment, copy data over, and then delete old - auto new_seg_it = findFreeBuffer(num_bytes); + auto new_seg_it = findFreeSegment(num_bytes); // Below should be in copy constructor for BufferSeg? new_seg_it->buffer = seg_it->buffer; @@ -351,16 +347,14 @@ BufferList::iterator BufferMgr::reserveBuffer( return new_seg_it; } -BufferList::iterator BufferMgr::findFreeBufferInSlab(const size_t slab_num, - const size_t num_pages_requested) { +SegmentList::iterator BufferMgr::findFreeSegmentInSlab(const size_t slab_num, + const size_t num_pages_requested) { // It is assumed that caller holds a lock on sized_segs_mutex_. - auto& slab = getSlab(slab_num); - auto& slab_segs = slab.buffers_; - auto& free_segs_index = slab.free_buffs_index_; - auto found = free_segs_index.getFreeBuff(num_pages_requested); + auto& slab = slab_segments_[slab_num]; + auto found = slab.getFreeSegment(num_pages_requested); - if (found != slab_segs.end()) { - free_segs_index.remove(found); + if (found != slab.end()) { + slab.index_remove(found); const size_t excess_pages = found->num_pages - num_pages_requested; found->num_pages = num_pages_requested; found->mem_status = USED; @@ -368,14 +362,14 @@ BufferList::iterator BufferMgr::findFreeBufferInSlab(const size_t slab_num, found->slab_num = slab_num; if (excess_pages > 0) { BufferSeg free_seg(found->start_page + num_pages_requested, excess_pages, FREE); - auto it = slab_segs.insert(std::next(found), free_seg); - free_segs_index.insert(it); + auto it = std::next(found); + slab.insert(it, free_seg); } } return found; } -BufferList::iterator BufferMgr::findFreeBuffer(const size_t num_bytes) { +SegmentList::iterator BufferMgr::findFreeSegment(const size_t num_bytes) { // It is assumed that caller holds a lock on sized_segs_mutex_. const size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_; const size_t num_slabs = slab_segments_.size(); @@ -383,9 +377,9 @@ BufferList::iterator BufferMgr::findFreeBuffer(const size_t num_bytes) { throw TooBigForSlab(num_bytes); } for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) { - auto& slab_segs = getSlab(slab_num).buffers_; - auto seg_it = findFreeBufferInSlab(slab_num, num_pages_requested); - if (seg_it != slab_segs.end()) { + auto& slab = slab_segments_[slab_num]; + auto seg_it = findFreeSegmentInSlab(slab_num, num_pages_requested); + if (seg_it != slab.end()) { return seg_it; } } @@ -411,7 +405,7 @@ BufferList::iterator BufferMgr::findFreeBuffer(const size_t num_bytes) { } // if here then addSlab succeeded num_pages_allocated_ += current_max_slab_page_size_; - return findFreeBufferInSlab( + return findFreeSegmentInSlab( num_slabs, num_pages_requested); // has to succeed since we made sure to request a slab // big enough to accomodate request @@ -451,14 +445,13 @@ BufferList::iterator BufferMgr::findFreeBuffer(const size_t num_bytes) { // We're going for lowest score here, like golf // This is because score is the sum of the lastTouched score for all pages evicted. // Evicting fewer pages and older pages will lower the score - BufferList::iterator best_eviction_start = slab_segments_.begin()->buffers_.end(); + SegmentList::iterator best_eviction_start = slab_segments_.begin()->end(); int best_eviction_start_slab_num = -1; int slab_num = 0; for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end(); ++slab_it, ++slab_num) { - auto& slab_segs = slab_it->buffers_; - for (auto buffer_it = slab_segs.begin(); buffer_it != slab_segs.end(); ++buffer_it) { + for (auto buffer_it = slab_it->begin(); buffer_it != slab_it->end(); ++buffer_it) { // Note there are some shortcuts we could take here - like we should never // consider a USED buffer coming after a free buffer as we would have used the // FREE buffer, but we won't worry about this for now @@ -470,7 +463,7 @@ BufferList::iterator BufferMgr::findFreeBuffer(const size_t num_bytes) { size_t score = 0; bool solution_found = false; auto evict_it = buffer_it; - for (; evict_it != slab_segs.end(); ++evict_it) { + for (; evict_it != slab_it->end(); ++evict_it) { // pinCount should never go up - only down because we have // global lock on buffer pool and pin count only increments // on getChunk @@ -498,7 +491,7 @@ BufferList::iterator BufferMgr::findFreeBuffer(const size_t num_bytes) { min_score = score; best_eviction_start = buffer_it; best_eviction_start_slab_num = slab_num; - } else if (evict_it == slab_segs.end()) { + } else if (evict_it == slab_it->end()) { // this means that every segment after this will fail as well, so our search has // proven futile // throw std::runtime_error ("Couldn't evict chunks to get free space"); @@ -510,7 +503,7 @@ BufferList::iterator BufferMgr::findFreeBuffer(const size_t num_bytes) { //} } } - if (best_eviction_start == slab_segments_.begin()->buffers_.end()) { + if (best_eviction_start == slab_segments_.begin()->end()) { LOG(ERROR) << "ALLOCATION failed to find " << num_bytes << "B throwing out of memory " << getStringMgrType() << ":" << device_id_; VLOG(2) << printSlabs(); @@ -530,7 +523,7 @@ std::string BufferMgr::printSlab(size_t slab_num) { std::ostringstream tss; // size_t lastEnd = 0; tss << "Slab St.Page Pages Touch" << std::endl; - for (auto segment : getSlab(slab_num).buffers_) { + for (auto segment : slab_segments_[slab_num]) { tss << setfill(' ') << setw(4) << slab_num; // tss << " BSN: " << setfill(' ') << setw(2) << segment.slab_num; tss << setfill(' ') << setw(8) << segment.start_page; @@ -570,7 +563,7 @@ std::string BufferMgr::printSlabs() { void BufferMgr::clearSlabs() { bool pinned_exists = false; - for (auto& [segment_list, _] : slab_segments_) { + for (auto& segment_list : slab_segments_) { for (auto& segment : segment_list) { if (segment.mem_status == FREE) { // no need to free @@ -611,7 +604,7 @@ size_t BufferMgr::getPageSize() { // return the size of the chunks in use in bytes size_t BufferMgr::getInUseSize() { size_t in_use = 0; - for (auto& [segment_list, _] : slab_segments_) { + for (auto& segment_list : slab_segments_) { for (auto& segment : segment_list) { if (segment.mem_status != FREE) { in_use += segment.num_pages * page_size_; @@ -621,7 +614,7 @@ size_t BufferMgr::getInUseSize() { return in_use; } -std::string BufferMgr::printSeg(BufferList::iterator& seg_it) { +std::string BufferMgr::printSeg(SegmentList::iterator& seg_it) { std::ostringstream tss; tss << "SN: " << setfill(' ') << setw(2) << seg_it->slab_num; tss << " SP: " << setfill(' ') << setw(7) << seg_it->start_page; @@ -670,11 +663,9 @@ void BufferMgr::printSegs() { LOG(INFO) << std::endl << " " << getStringMgrType() << ":" << device_id_; for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end(); ++slab_it, ++slab_num) { - auto& slab_segs = slab_it->buffers_; LOG(INFO) << "Slab Num: " << slab_num << " " << getStringMgrType() << ":" << device_id_; - for (auto seg_it = slab_segs.begin(); seg_it != slab_segs.end(); - ++seg_it, ++seg_num) { + for (auto seg_it = slab_it->begin(); seg_it != slab_it->end(); ++seg_it, ++seg_num) { LOG(INFO) << "Segment: " << seg_num << " " << getStringMgrType() << ":" << device_id_; printSeg(seg_it); @@ -744,7 +735,7 @@ void BufferMgr::deleteBuffersWithPrefix(const ChunkKey& key_prefix, const bool) } } -void BufferMgr::removeSegment(BufferList::iterator& seg_it) { +void BufferMgr::removeSegment(SegmentList::iterator& seg_it) { // If seg_it is referencing some slab then caller of this method // should hold a lock for sized_segs_mutex_. // Note: does not delete buffer as this may be moved somewhere else @@ -754,36 +745,31 @@ void BufferMgr::removeSegment(BufferList::iterator& seg_it) { std::lock_guard unsized_segs_lock(unsized_segs_mutex_); unsized_segs_.erase(seg_it); } else { - auto& slab = getSlab(slab_num); - auto& slab_segs = slab.buffers_; - auto& free_segs_index = slab.free_buffs_index_; + auto& slab = slab_segments_[slab_num]; if (seg_it->mem_status == FREE) { - free_segs_index.remove(seg_it); + slab.index_remove(seg_it); } - - if (seg_it != slab_segs.begin()) { + if (seg_it != slab.begin()) { auto prev_it = std::prev(seg_it); // LOG(INFO) << "PrevIt: " << " " << getStringMgrType() << ":" << device_id_; // printSeg(prev_it); if (prev_it->mem_status == FREE) { seg_it->start_page = prev_it->start_page; seg_it->num_pages += prev_it->num_pages; - free_segs_index.remove(prev_it); - slab_segs.erase(prev_it); + slab.remove(prev_it); } } auto next_it = std::next(seg_it); - if (next_it != slab_segs.end()) { + if (next_it != slab.end()) { if (next_it->mem_status == FREE) { seg_it->num_pages += next_it->num_pages; - free_segs_index.remove(next_it); - slab_segs.erase(next_it); + slab.remove(next_it); } } seg_it->mem_status = FREE; // seg_it->pinCount = 0; seg_it->buffer = nullptr; - free_segs_index.insert(seg_it); + slab.index_insert(seg_it); } } @@ -946,14 +932,6 @@ void BufferMgr::getChunkMetadataVecForKeyPrefix(ChunkMetadataVector& chunk_metad LOG(FATAL) << "getChunkMetadataVecForPrefix not supported for BufferMgr."; } -const std::vector BufferMgr::getSlabSegments() { - std::vector res; - for (const auto& [slab_segs, _] : slab_segments_) { - res.push_back(slab_segs); - } - return res; -} - std::unique_ptr BufferMgr::getZeroCopyBufferMemory(const ChunkKey& key, size_t numBytes) { return parent_mgr_->getZeroCopyBufferMemory(key, numBytes); @@ -973,7 +951,7 @@ MemoryInfo BufferMgr::getMemoryInfo() { mi.isAllocationCapped = isAllocationCapped(); mi.numPageAllocated = getAllocated() / mi.pageSize; for (size_t slab_num = 0; slab_num < slab_segments_.size(); ++slab_num) { - for (auto segment : getSlab(slab_num).buffers_) { + for (auto segment : slab_segments_[slab_num]) { MemoryData md; md.slabNum = slab_num; md.startPage = segment.start_page; diff --git a/omniscidb/DataMgr/BufferMgr/BufferMgr.h b/omniscidb/DataMgr/BufferMgr/BufferMgr.h index 1532997bb6..e302b7c288 100644 --- a/omniscidb/DataMgr/BufferMgr/BufferMgr.h +++ b/omniscidb/DataMgr/BufferMgr/BufferMgr.h @@ -86,67 +86,51 @@ class TooBigForSlab : public OutOfMemory { using namespace Data_Namespace; namespace Buffer_Namespace { - -class SlabFreeBufIndex { - public: - SlabFreeBufIndex(BufferList& list_to_index) : list_to_index_(list_to_index) { - for (BufferList::iterator it = list_to_index_.begin(); it != list_to_index_.end(); - ++it) { - if (it->mem_status == MemStatus::FREE) { - insert(it); +struct Slab { + private: + static bool iterator_page_count_cmp_l(const SegmentList::iterator& lhs, + const size_t num_pages) { + return lhs->num_pages < num_pages; + } + struct SegmentListIterCmp { + bool operator()(const SegmentList::iterator& lhs, + const SegmentList::iterator& rhs) const { + if (lhs->num_pages < rhs->num_pages) { + return true; + } + if (lhs->num_pages > rhs->num_pages) { + return false; } + return (lhs->start_page < rhs->start_page); } - CHECK_GT(free_buffs_.size(), 0); - } + }; - BufferList::iterator getFreeBuff(const size_t num_requested_pages); + std::set free_segment_index_; + SegmentList segments_; - void insert(BufferList::iterator& slab_buffer_pos); + public: + Slab(const size_t page_count) : segments_(SegmentList{BufferSeg(0, page_count)}) { + free_segment_index_.insert(segments_.begin()); + } - void remove(BufferList::iterator& slab_buffer_pos); + SegmentList::iterator getFreeSegment(const size_t num_requested_pages); - void verify() const; + SegmentList::iterator insert(SegmentList::iterator& slab_buffer_pos, + BufferSeg& data_seg); - const std::vector& getFreeBuffs() const { return free_buffs_; }; - const BufferList& getListToIndex() const { return list_to_index_; }; + SegmentList::iterator remove(SegmentList::iterator slab_buffer_pos); - private: - static bool iterator_cmp(const BufferList::iterator& lhs, - const BufferList::iterator& rhs) { - return lhs->num_pages < rhs->num_pages; - } - static bool iterator_page_count_cmp_l(const BufferList::iterator& lhs, - const size_t num_pages) { - return lhs->num_pages < num_pages; - } - static bool iterator_page_count_cmp_u(const size_t num_pages, - const BufferList::iterator& lhs) { - return lhs->num_pages < num_pages; - } - std::vector::iterator findExact( - BufferList::iterator& slab_buffer_pos) { - auto index_pos = std::lower_bound( - free_buffs_.begin(), free_buffs_.end(), slab_buffer_pos, iterator_cmp); - while (index_pos != free_buffs_.end() && (*index_pos) != slab_buffer_pos && - (*index_pos)->num_pages == slab_buffer_pos->num_pages) { - index_pos++; - } - CHECK(index_pos != free_buffs_.end()); - CHECK((*index_pos) == slab_buffer_pos); - CHECK_EQ((*index_pos)->num_pages, slab_buffer_pos->num_pages); - return index_pos; - } - std::vector free_buffs_; - BufferList& list_to_index_; -}; + void index_insert(SegmentList::iterator& to_insert); + void index_remove(SegmentList::iterator& at); + void verify_index(); -struct Slab { - Slab(BufferList& buffers) : buffers_(buffers), free_buffs_index_(buffers_) {} - Slab(BufferList&& buffers) - : buffers_(std::move(buffers)), free_buffs_index_(buffers_) {} + SegmentList::iterator begin() { return segments_.begin(); } + SegmentList::iterator end() { return segments_.end(); } - BufferList buffers_; - SlabFreeBufIndex free_buffs_index_; + const std::set& getFreeSegments() const { + return free_segment_index_; + }; + const SegmentList& getListToIndex() const { return segments_; }; }; struct MemoryData { @@ -194,7 +178,7 @@ class BufferMgr : public AbstractBufferMgr { // implements void clearSlabs(); std::string printMap(); void printSegs(); - std::string printSeg(BufferList::iterator& seg_it); + std::string printSeg(SegmentList::iterator& seg_it); std::string keyToString(const ChunkKey& key); size_t getInUseSize() override; size_t getMaxSize() override; @@ -203,7 +187,6 @@ class BufferMgr : public AbstractBufferMgr { // implements size_t getMaxSlabSize(); size_t getPageSize(); bool isAllocationCapped() override; - const std::vector getSlabSegments(); /// Creates a chunk with the specified key and page size. AbstractBuffer* createBuffer(const ChunkKey& key, @@ -255,8 +238,8 @@ class BufferMgr : public AbstractBufferMgr { // implements size_t size(); size_t getNumChunks() override; - BufferList::iterator reserveBuffer(BufferList::iterator& seg_it, - const size_t num_bytes); + SegmentList::iterator reserveBuffer(SegmentList::iterator& seg_it, + const size_t num_bytes); void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector& chunk_metadata_vec, const ChunkKey& key_prefix) override; @@ -272,18 +255,18 @@ class BufferMgr : public AbstractBufferMgr { // implements const size_t page_size_; std::vector slabs_; /// vector of beginning memory addresses for each /// allocation of the buffer pool - std::list slab_segments_; + std::vector slab_segments_; private: BufferMgr(const BufferMgr&); // private copy constructor BufferMgr& operator=(const BufferMgr&); // private assignment - void removeSegment(BufferList::iterator& seg_it); - BufferList::iterator findFreeBufferInSlab(const size_t slab_num, - const size_t num_pages_requested); + void removeSegment(SegmentList::iterator& seg_it); + SegmentList::iterator findFreeSegmentInSlab(const size_t slab_num, + const size_t num_pages_requested); int getBufferId(); virtual void addSlab(const size_t slab_size) = 0; virtual void freeAllMem() = 0; - virtual void allocateBuffer(BufferList::iterator seg_it, + virtual void allocateBuffer(SegmentList::iterator seg_it, const size_t page_size, const size_t num_bytes) = 0; virtual AbstractBuffer* allocateZeroCopyBuffer( @@ -303,7 +286,7 @@ class BufferMgr : public AbstractBufferMgr { // implements // to this map should be synced throug chunk_index_mutex_. std::map> in_progress_buffer_cvs_; - std::map chunk_index_; + std::map chunk_index_; size_t max_buffer_pool_num_pages_; // max number of pages for buffer pool size_t num_pages_allocated_; size_t min_num_pages_per_slab_; @@ -314,13 +297,12 @@ class BufferMgr : public AbstractBufferMgr { // implements int max_buffer_id_; unsigned int buffer_epoch_; - BufferList unsized_segs_; + SegmentList unsized_segs_; - BufferList::iterator evict(BufferList::iterator& evict_start, - const size_t num_pages_requested, - const int slab_num); + SegmentList::iterator evict(SegmentList::iterator& evict_start, + const size_t num_pages_requested, + const int slab_num); - Slab& getSlab(const size_t slab_num); /** * @brief Gets a buffer of required size and returns an iterator to it * @@ -334,7 +316,7 @@ class BufferMgr : public AbstractBufferMgr { // implements * USED if applicable * */ - BufferList::iterator findFreeBuffer(size_t num_bytes); + SegmentList::iterator findFreeSegment(size_t num_bytes); }; } // namespace Buffer_Namespace diff --git a/omniscidb/DataMgr/BufferMgr/BufferSeg.h b/omniscidb/DataMgr/BufferMgr/BufferSeg.h index dcfe8a5aae..4c34a4ec2b 100644 --- a/omniscidb/DataMgr/BufferMgr/BufferSeg.h +++ b/omniscidb/DataMgr/BufferMgr/BufferSeg.h @@ -68,5 +68,5 @@ struct BufferSeg { , last_touched(last_touched) {} }; -using BufferList = std::list; +using SegmentList = std::list; } // namespace Buffer_Namespace diff --git a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp index 396daddb07..5b5bfd567e 100644 --- a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp +++ b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp @@ -25,7 +25,7 @@ namespace Buffer_Namespace { CpuBuffer::CpuBuffer(BufferMgr* bm, - BufferList::iterator segment_iter, + SegmentList::iterator segment_iter, const int device_id, GpuMgr* gpu_mgr, const size_t page_size, diff --git a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.h b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.h index 4700af474e..d46bbd697e 100644 --- a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.h +++ b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.h @@ -23,7 +23,7 @@ namespace Buffer_Namespace { class CpuBuffer : public Buffer { public: CpuBuffer(BufferMgr* bm, - BufferList::iterator segment_iter, + SegmentList::iterator segment_iter, const int device_id, GpuMgr* gpu_mgr, const size_t page_size = 512, diff --git a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBufferMgr.cpp b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBufferMgr.cpp index ba63a0f74f..1ce3dbbe30 100644 --- a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBufferMgr.cpp +++ b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBufferMgr.cpp @@ -30,7 +30,7 @@ void CpuBufferMgr::addSlab(const size_t slab_size) { slabs_.resize(slabs_.size() - 1); throw FailedToCreateSlab(slab_size); } - slab_segments_.emplace_back(BufferList{BufferSeg(0, slab_size / page_size_)}); + slab_segments_.emplace_back(slab_size / page_size_); } void CpuBufferMgr::freeAllMem() { @@ -38,7 +38,7 @@ void CpuBufferMgr::freeAllMem() { initializeMem(); } -void CpuBufferMgr::allocateBuffer(BufferList::iterator seg_it, +void CpuBufferMgr::allocateBuffer(SegmentList::iterator seg_it, const size_t page_size, const size_t initial_size) { new CpuBuffer(this, diff --git a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBufferMgr.h b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBufferMgr.h index e30d9a48d5..ace2bf995f 100644 --- a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBufferMgr.h +++ b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBufferMgr.h @@ -55,7 +55,7 @@ class CpuBufferMgr : public BufferMgr { protected: void addSlab(const size_t slab_size) override; void freeAllMem() override; - void allocateBuffer(BufferList::iterator segment_iter, + void allocateBuffer(SegmentList::iterator segment_iter, const size_t page_size, const size_t initial_size) override; virtual void initializeMem(); diff --git a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/TieredCpuBufferMgr.cpp b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/TieredCpuBufferMgr.cpp index 08d0764221..6ebecfeb19 100644 --- a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/TieredCpuBufferMgr.cpp +++ b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/TieredCpuBufferMgr.cpp @@ -89,7 +89,7 @@ void TieredCpuBufferMgr::addSlab(const size_t slab_size) { } if (allocated_slab) { // We allocated a new slab, so add segments for it. - slab_segments_.emplace_back(BufferList{BufferSeg(0, slab_size / page_size_)}); + slab_segments_.emplace_back(slab_size / page_size_); LOG(INFO) << "Allocated slab using " << tier_to_string(last_tier) << "."; } else { // None of the allocators allocated a slab, so revert to original size and throw. diff --git a/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBuffer.cpp b/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBuffer.cpp index 3fa8f9f7a2..d689cf888d 100644 --- a/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBuffer.cpp +++ b/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBuffer.cpp @@ -22,7 +22,7 @@ namespace Buffer_Namespace { GpuBuffer::GpuBuffer(BufferMgr* bm, - BufferList::iterator seg_it, + SegmentList::iterator seg_it, const int device_id, GpuMgr* gpu_mgr, const size_t page_size, diff --git a/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBuffer.h b/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBuffer.h index 1640721819..3a8cbcea2e 100644 --- a/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBuffer.h +++ b/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBuffer.h @@ -24,7 +24,7 @@ namespace Buffer_Namespace { class GpuBuffer : public Buffer { public: GpuBuffer(BufferMgr* bm, - BufferList::iterator seg_it, + SegmentList::iterator seg_it, const int device_id, GpuMgr* gpu_mgr, const size_t page_size = 512, diff --git a/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBufferMgr.cpp b/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBufferMgr.cpp index 6f477b59c4..d7f6f116af 100644 --- a/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBufferMgr.cpp +++ b/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBufferMgr.cpp @@ -63,7 +63,7 @@ void GpuBufferMgr::addSlab(const size_t slab_size) { slabs_.resize(slabs_.size() - 1); throw FailedToCreateSlab(slab_size); } - slab_segments_.emplace_back(BufferList{BufferSeg(0, slab_size / page_size_)}); + slab_segments_.emplace_back(slab_size / page_size_); } void GpuBufferMgr::freeAllMem() { @@ -72,7 +72,7 @@ void GpuBufferMgr::freeAllMem() { } } -void GpuBufferMgr::allocateBuffer(BufferList::iterator seg_it, +void GpuBufferMgr::allocateBuffer(SegmentList::iterator seg_it, const size_t page_size, const size_t initial_size) { new GpuBuffer(this, diff --git a/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBufferMgr.h b/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBufferMgr.h index c09fd8fe1e..6335555628 100644 --- a/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBufferMgr.h +++ b/omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBufferMgr.h @@ -41,7 +41,7 @@ class GpuBufferMgr : public BufferMgr { private: void addSlab(const size_t slab_size) override; void freeAllMem() override; - void allocateBuffer(BufferList::iterator seg_it, + void allocateBuffer(SegmentList::iterator seg_it, const size_t page_size, const size_t initial_size) override; GpuMgr* gpu_mgr_; diff --git a/omniscidb/QueryEngine/RelAlgExecutor.cpp b/omniscidb/QueryEngine/RelAlgExecutor.cpp index 2a48b7e2f7..48f515328f 100644 --- a/omniscidb/QueryEngine/RelAlgExecutor.cpp +++ b/omniscidb/QueryEngine/RelAlgExecutor.cpp @@ -1448,11 +1448,12 @@ ExecutionResult RelAlgExecutor::executeWorkUnit( } const auto body = work_unit.body; CHECK(body); + auto timer1 = DEBUG_TIMER("get_table_infos"); const auto table_infos = get_table_infos(work_unit.exe_unit, executor_); + auto timer2 = DEBUG_TIMER("ALL"); auto ra_exe_unit = decide_approx_count_distinct_implementation( work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_); - auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess; if (is_window_execution_unit(ra_exe_unit)) { CHECK_EQ(table_infos.size(), size_t(1)); @@ -1484,6 +1485,7 @@ ExecutionResult RelAlgExecutor::executeWorkUnit( eo.output_columnar_hint = true; } } + timer2.stop(); ExecutionResult result; auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,