From 3ac48fe29500236af4ecb2c50d41888d784bee81 Mon Sep 17 00:00:00 2001 From: Dmitrii Makarenko Date: Tue, 1 Aug 2023 06:58:50 +0000 Subject: [PATCH] [Join] Zero-copy storage column check. This commit adds check for valid pointer to column buffer to skip copying. Most effective with enabled `enable-non-lazy-data-import` option. Checks number of chunks in storage. Partially resolves: #574 Signed-off-by: Dmitrii Makarenko --- omniscidb/ArrowStorage/ArrowStorage.cpp | 45 +++++++++++++++++++ omniscidb/ArrowStorage/ArrowStorage.h | 3 ++ omniscidb/DataMgr/AbstractBufferMgr.h | 2 + omniscidb/DataMgr/AbstractDataProvider.h | 7 +++ omniscidb/DataMgr/BufferMgr/BufferMgr.cpp | 5 +++ omniscidb/DataMgr/BufferMgr/BufferMgr.h | 3 ++ omniscidb/DataMgr/DataMgr.cpp | 7 +++ omniscidb/DataMgr/DataMgr.h | 2 + omniscidb/DataMgr/DataMgrDataProvider.cpp | 7 +++ omniscidb/DataMgr/DataMgrDataProvider.h | 3 ++ .../PersistentStorageMgr.cpp | 5 +++ .../PersistentStorageMgr.h | 2 + omniscidb/DataProvider/DataProvider.h | 5 +++ omniscidb/QueryEngine/ColumnFetcher.cpp | 37 +++++++++------ omniscidb/QueryEngine/NativeCodegen.cpp | 5 +-- omniscidb/ResultSet/RowSetMemoryOwner.h | 11 +++++ .../ResultSetRegistry/ColumnarResults.cpp | 21 ++++++++- omniscidb/ResultSetRegistry/ColumnarResults.h | 5 +++ 18 files changed, 157 insertions(+), 18 deletions(-) diff --git a/omniscidb/ArrowStorage/ArrowStorage.cpp b/omniscidb/ArrowStorage/ArrowStorage.cpp index 5456b3a779..6a36e2f3ee 100644 --- a/omniscidb/ArrowStorage/ArrowStorage.cpp +++ b/omniscidb/ArrowStorage/ArrowStorage.cpp @@ -187,6 +187,51 @@ std::unique_ptr ArrowStorage::getZeroCopyBufferMemory( return nullptr; } +std::unique_ptr ArrowStorage::getZeroCopyColumnData( + const ColumnRef& col_ref) { + mapd_shared_lock data_lock(data_mutex_); + CHECK_EQ(col_ref.db_id, db_id_); + CHECK_EQ(tables_.count(col_ref.table_id), (size_t)1); + auto& table = *tables_.at(col_ref.table_id); + mapd_shared_lock table_lock(table.mutex); + data_lock.unlock(); + + auto col_type = getColumnInfo(col_ref.db_id, col_ref.table_id, col_ref.column_id)->type; + + if (col_type->isExtDictionary()) { + auto dict_id = col_type->as()->dictId(); + auto dict_descriptor = getDictMetadata( + dict_id); // this will force materialize the dictionary. it is thread safe + CHECK(dict_descriptor); + } + + if (!col_type->isVarLen()) { + size_t col_idx = columnIndex(col_ref.column_id); + if (col_idx >= table.col_data.size()) { + return nullptr; + } + size_t elem_size = col_type->size(); + const auto* fixed_type = + dynamic_cast(table.col_data[col_idx]->type().get()); + CHECK(fixed_type) << table.col_data[col_idx]->type()->ToString() << " (table " + << col_ref.table_id << ", column " << col_idx << ")"; + size_t arrow_elem_size = fixed_type->bit_width() / 8; + size_t elems = elem_size / arrow_elem_size; + CHECK_GT(elems, (size_t)0); + const auto& data_to_fetch = table.col_data[col_idx]; + if (data_to_fetch->num_chunks() == 1) { + auto chunk = data_to_fetch->chunk(0); + const int8_t* ptr = + chunk->data()->GetValues(1, chunk->data()->offset * arrow_elem_size); + size_t chunk_size = chunk->length() * arrow_elem_size; + return std::make_unique( + std::move(chunk), col_type, ptr, chunk_size); + } + } + + return nullptr; +} + void ArrowStorage::fetchFixedLenData(const TableData& table, size_t frag_idx, size_t col_idx, diff --git a/omniscidb/ArrowStorage/ArrowStorage.h b/omniscidb/ArrowStorage/ArrowStorage.h index 4d4c9b6600..deffa0a6a5 100644 --- a/omniscidb/ArrowStorage/ArrowStorage.h +++ b/omniscidb/ArrowStorage/ArrowStorage.h @@ -70,6 +70,9 @@ class ArrowStorage : public SimpleSchemaProvider, public AbstractDataProvider { const ChunkKey& key, size_t num_bytes) override; + std::unique_ptr getZeroCopyColumnData( + const ColumnRef& col_ref) override; + TableFragmentsInfo getTableMetadata(int db_id, int table_id) const override; const DictDescriptor* getDictMetadata(int dict_id, bool load_dict = true) override; diff --git a/omniscidb/DataMgr/AbstractBufferMgr.h b/omniscidb/DataMgr/AbstractBufferMgr.h index 214b1a647b..7c29dee2b1 100644 --- a/omniscidb/DataMgr/AbstractBufferMgr.h +++ b/omniscidb/DataMgr/AbstractBufferMgr.h @@ -86,6 +86,8 @@ class AbstractBufferMgr { virtual AbstractBuffer* getBuffer(const ChunkKey& key, const size_t numBytes = 0) = 0; virtual std::unique_ptr getZeroCopyBufferMemory(const ChunkKey& key, size_t numBytes) = 0; + virtual std::unique_ptr getZeroCopyColumnData( + const ColumnRef& col_ref) = 0; virtual void fetchBuffer(const ChunkKey& key, AbstractBuffer* destBuffer, const size_t numBytes = 0) = 0; diff --git a/omniscidb/DataMgr/AbstractDataProvider.h b/omniscidb/DataMgr/AbstractDataProvider.h index 6061e8030f..72d330cfb4 100644 --- a/omniscidb/DataMgr/AbstractDataProvider.h +++ b/omniscidb/DataMgr/AbstractDataProvider.h @@ -45,6 +45,13 @@ class AbstractDataProvider : public Data_Namespace::AbstractBufferMgr { return nullptr; } + // TODO(dmitriim) remove this method after enabling + // of hashtable, that takes into a count frag_id and offset + std::unique_ptr getZeroCopyColumnData( + const ColumnRef& col_ref) override { + return nullptr; + } + void deleteBuffer(const ChunkKey& key, const bool purge = true) override { UNREACHABLE(); } diff --git a/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp b/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp index f7db36956d..fc710d462e 100644 --- a/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp +++ b/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp @@ -865,6 +865,11 @@ std::unique_ptr BufferMgr::getZeroCopyBufferMemory(const Chun return parent_mgr_->getZeroCopyBufferMemory(key, numBytes); } +std::unique_ptr BufferMgr::getZeroCopyColumnData( + const ColumnRef& col_ref) { + return parent_mgr_->getZeroCopyColumnData(col_ref); +} + MemoryInfo BufferMgr::getMemoryInfo() { std::unique_lock sized_segs_lock(sized_segs_mutex_); MemoryInfo mi; diff --git a/omniscidb/DataMgr/BufferMgr/BufferMgr.h b/omniscidb/DataMgr/BufferMgr/BufferMgr.h index f864b3b8f8..ec871e68f5 100644 --- a/omniscidb/DataMgr/BufferMgr/BufferMgr.h +++ b/omniscidb/DataMgr/BufferMgr/BufferMgr.h @@ -162,6 +162,9 @@ class BufferMgr : public AbstractBufferMgr { // implements std::unique_ptr getZeroCopyBufferMemory(const ChunkKey& key, size_t numBytes) override; + std::unique_ptr getZeroCopyColumnData( + const ColumnRef& col_ref) override; + /** * @brief Puts the contents of d into the Buffer with ChunkKey key. * @param key - Unique identifier for a Chunk. diff --git a/omniscidb/DataMgr/DataMgr.cpp b/omniscidb/DataMgr/DataMgr.cpp index d406a23785..60d31ec070 100644 --- a/omniscidb/DataMgr/DataMgr.cpp +++ b/omniscidb/DataMgr/DataMgr.cpp @@ -458,6 +458,13 @@ AbstractBuffer* DataMgr::getChunkBuffer(const ChunkKey& key, return bufferMgrs_[level][deviceId]->getBuffer(key, numBytes); } +std::unique_ptr DataMgr::getZeroCopyColumnData( + const ColumnRef& col_ref) { + const auto level = static_cast(Data_Namespace::CPU_LEVEL); + CHECK_LT(level, levelSizes_.size()); // make sure we have a legit buffermgr + return bufferMgrs_[level][0]->getZeroCopyColumnData(col_ref); +} + void DataMgr::deleteChunksWithPrefix(const ChunkKey& keyPrefix) { int numLevels = bufferMgrs_.size(); for (int level = numLevels - 1; level >= 0; --level) { diff --git a/omniscidb/DataMgr/DataMgr.h b/omniscidb/DataMgr/DataMgr.h index 82fc72f799..c98672645a 100644 --- a/omniscidb/DataMgr/DataMgr.h +++ b/omniscidb/DataMgr/DataMgr.h @@ -167,6 +167,8 @@ class DataMgr { const MemoryLevel memoryLevel, const int deviceId = 0, const size_t numBytes = 0); + // TODO(dmitriim) remove this method after enabling of hashtable + std::unique_ptr getZeroCopyColumnData(const ColumnRef& col_ref); void deleteChunksWithPrefix(const ChunkKey& keyPrefix); void deleteChunksWithPrefix(const ChunkKey& keyPrefix, const MemoryLevel memLevel); AbstractBuffer* alloc(const MemoryLevel memoryLevel, diff --git a/omniscidb/DataMgr/DataMgrDataProvider.cpp b/omniscidb/DataMgr/DataMgrDataProvider.cpp index 0f2f323a03..3a8d003b8a 100644 --- a/omniscidb/DataMgr/DataMgrDataProvider.cpp +++ b/omniscidb/DataMgr/DataMgrDataProvider.cpp @@ -29,9 +29,16 @@ std::shared_ptr DataMgrDataProvider::getChunk( return Chunk_NS::Chunk::getChunk( col_info, data_mgr_, key, memory_level, device_id, num_bytes, num_elems); } + +std::unique_ptr +DataMgrDataProvider::getZeroCopyColumnData(const ColumnRef& col_ref) { + return data_mgr_->getZeroCopyColumnData(col_ref); +} + TableFragmentsInfo DataMgrDataProvider::getTableMetadata(int db_id, int table_id) const { return data_mgr_->getTableMetadata(db_id, table_id); } + const DictDescriptor* DataMgrDataProvider::getDictMetadata(int dict_id, bool load_dict) const { return data_mgr_->getDictMetadata(dict_id, load_dict); diff --git a/omniscidb/DataMgr/DataMgrDataProvider.h b/omniscidb/DataMgr/DataMgrDataProvider.h index d04b31bc48..d96daa3629 100644 --- a/omniscidb/DataMgr/DataMgrDataProvider.h +++ b/omniscidb/DataMgr/DataMgrDataProvider.h @@ -35,6 +35,9 @@ class DataMgrDataProvider : public DataProvider { const size_t num_bytes, const size_t num_elems) override; + std::unique_ptr getZeroCopyColumnData( + const ColumnRef& col_ref) override; + TableFragmentsInfo getTableMetadata(int db_id, int table_id) const override; const DictDescriptor* getDictMetadata(int dict_id, diff --git a/omniscidb/DataMgr/PersistentStorageMgr/PersistentStorageMgr.cpp b/omniscidb/DataMgr/PersistentStorageMgr/PersistentStorageMgr.cpp index b1a829161f..9962451eeb 100644 --- a/omniscidb/DataMgr/PersistentStorageMgr/PersistentStorageMgr.cpp +++ b/omniscidb/DataMgr/PersistentStorageMgr/PersistentStorageMgr.cpp @@ -55,6 +55,11 @@ std::unique_ptr PersistentStorageMgr::getZeroCopyBufferMemory return getStorageMgrForTableKey(key)->getZeroCopyBufferMemory(key, numBytes); } +std::unique_ptr PersistentStorageMgr::getZeroCopyColumnData( + const ColumnRef& col_ref) { + return getStorageMgr(col_ref.db_id)->getZeroCopyColumnData(col_ref); +} + void PersistentStorageMgr::fetchBuffer(const ChunkKey& chunk_key, AbstractBuffer* destination_buffer, const size_t num_bytes) { diff --git a/omniscidb/DataMgr/PersistentStorageMgr/PersistentStorageMgr.h b/omniscidb/DataMgr/PersistentStorageMgr/PersistentStorageMgr.h index 662ee74ced..ad3b53890f 100644 --- a/omniscidb/DataMgr/PersistentStorageMgr/PersistentStorageMgr.h +++ b/omniscidb/DataMgr/PersistentStorageMgr/PersistentStorageMgr.h @@ -35,6 +35,8 @@ class PersistentStorageMgr : public AbstractBufferMgr { AbstractBuffer* getBuffer(const ChunkKey& chunk_key, const size_t num_bytes) override; std::unique_ptr getZeroCopyBufferMemory(const ChunkKey& key, size_t numBytes) override; + std::unique_ptr getZeroCopyColumnData( + const ColumnRef& col_ref) override; void fetchBuffer(const ChunkKey& chunk_key, AbstractBuffer* destination_buffer, const size_t num_bytes) override; diff --git a/omniscidb/DataProvider/DataProvider.h b/omniscidb/DataProvider/DataProvider.h index 550ad2cd29..9bf45068a6 100644 --- a/omniscidb/DataProvider/DataProvider.h +++ b/omniscidb/DataProvider/DataProvider.h @@ -36,6 +36,11 @@ class DataProvider { const size_t num_bytes, const size_t num_elems) = 0; + // CPU only + // TODO(dmitriim) remove this method after enabling of hashtable + virtual std::unique_ptr getZeroCopyColumnData( + const ColumnRef& col_ref) = 0; + virtual TableFragmentsInfo getTableMetadata(int db_id, int table_id) const = 0; virtual const DictDescriptor* getDictMetadata(int dict_id, diff --git a/omniscidb/QueryEngine/ColumnFetcher.cpp b/omniscidb/QueryEngine/ColumnFetcher.cpp index 7b3ad529d4..f95223edf7 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.cpp +++ b/omniscidb/QueryEngine/ColumnFetcher.cpp @@ -213,22 +213,20 @@ const int8_t* ColumnFetcher::getOneTableColumnFragment( auto& chunk_iter = chunk_iter_holder.back(); if (memory_level == Data_Namespace::CPU_LEVEL) { return reinterpret_cast(&chunk_iter); - } else { - auto ab = chunk->getBuffer(); - auto& row_set_mem_owner = executor_->getRowSetMemoryOwner(); - row_set_mem_owner->addVarlenInputBuffer(ab); - CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level); - CHECK(allocator); - auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter)); - allocator->copyToDevice( - chunk_iter_gpu, reinterpret_cast(&chunk_iter), sizeof(ChunkIter)); - return chunk_iter_gpu; } - } else { auto ab = chunk->getBuffer(); - CHECK(ab->getMemoryPtr()); - return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter + auto& row_set_mem_owner = executor_->getRowSetMemoryOwner(); + row_set_mem_owner->addVarlenInputBuffer(ab); + CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level); + CHECK(allocator); + auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter)); + allocator->copyToDevice( + chunk_iter_gpu, reinterpret_cast(&chunk_iter), sizeof(ChunkIter)); + return chunk_iter_gpu; } + auto ab = chunk->getBuffer(); + CHECK(ab->getMemoryPtr()); + return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter } const int8_t* ColumnFetcher::getAllTableColumnFragments( @@ -250,6 +248,19 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( const InputDescriptor table_desc(db_id, table_id, int(0)); { std::lock_guard columnar_conversion_guard(columnar_fetch_mutex_); + + auto col_token = data_provider_->getZeroCopyColumnData(*col_info); + if (col_token != nullptr) { + size_t num_rows = col_token->getSize() / col_token->getType()->size(); + auto raw_mem_ptr = + executor_->row_set_mem_owner_->saveDataToken(std::move(col_token)); + ColumnarResults res( + {const_cast(raw_mem_ptr)}, num_rows, col_info->type, thread_idx); + + return ColumnFetcher::transferColumnIfNeeded( + &res, 0, memory_level, device_id, device_allocator); + } + auto column_it = columnarized_scan_table_cache_.find({table_id, col_id}); if (column_it == columnarized_scan_table_cache_.end()) { for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { diff --git a/omniscidb/QueryEngine/NativeCodegen.cpp b/omniscidb/QueryEngine/NativeCodegen.cpp index 9bffebfc49..eb46d492d5 100644 --- a/omniscidb/QueryEngine/NativeCodegen.cpp +++ b/omniscidb/QueryEngine/NativeCodegen.cpp @@ -711,9 +711,8 @@ std::vector get_agg_fnames( CHECK(target_expr); auto target_type = target_expr->type(); const auto agg_expr = target_expr->as(); - const bool is_varlen = - target_type->isString() || - target_type->isArray(); // TODO: should it use is_varlen_array() ? + // Fixed length arrays are also included here. + const bool is_varlen = target_type->isString() || target_type->isArray(); if (!agg_expr || agg_expr->aggType() == hdk::ir::AggType::kSample) { result.emplace_back(target_type->isFloatingPoint() ? "agg_id_double" : "agg_id"); if (is_varlen) { diff --git a/omniscidb/ResultSet/RowSetMemoryOwner.h b/omniscidb/ResultSet/RowSetMemoryOwner.h index 5a99772280..db7993f1ba 100644 --- a/omniscidb/ResultSet/RowSetMemoryOwner.h +++ b/omniscidb/ResultSet/RowSetMemoryOwner.h @@ -97,6 +97,16 @@ class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable { varlen_buffers_.push_back(varlen_buffer); } + const int8_t* saveDataToken(std::unique_ptr token) { + std::lock_guard lock(state_mutex_); + auto mem_ptr = token->getMemoryPtr(); + if (data_tokens_.count(mem_ptr) == 0) { + data_tokens_.insert({mem_ptr, std::move(token)}); + } + + return data_tokens_[mem_ptr]->getMemoryPtr(); + } + /** * Adds a GPU buffer containing a variable length input column. Variable length inputs * on GPU are referenced in output projected targets and should not be freed until the @@ -258,6 +268,7 @@ class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable { std::vector col_buffers_; std::vector varlen_input_buffers_; std::vector> t_digests_; + std::unordered_map> data_tokens_; DataProvider* data_provider_; // for metadata lookups size_t arena_block_size_; // for cloning diff --git a/omniscidb/ResultSetRegistry/ColumnarResults.cpp b/omniscidb/ResultSetRegistry/ColumnarResults.cpp index 587fde0e90..d562aa186f 100644 --- a/omniscidb/ResultSetRegistry/ColumnarResults.cpp +++ b/omniscidb/ResultSetRegistry/ColumnarResults.cpp @@ -134,9 +134,8 @@ ColumnarResults::ColumnarResults(std::shared_ptr row_set_mem_ , direct_columnar_conversion_(false) , thread_idx_(thread_idx) { auto timer = DEBUG_TIMER(__func__); - const bool is_varlen = target_type->isArray() || target_type->isString(); - if (is_varlen) { + if (target_type->isVarLen()) { throw ColumnarConversionNotSupported(); } const auto buf_size = num_rows * target_type->size(); @@ -145,6 +144,24 @@ ColumnarResults::ColumnarResults(std::shared_ptr row_set_mem_ memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size); } +ColumnarResults::ColumnarResults(const std::vector one_col_buffer, + const size_t num_rows, + const hdk::ir::Type* target_type, + const size_t thread_idx) + : column_buffers_(1) + , num_rows_(num_rows) + , target_types_{target_type} + , parallel_conversion_(false) + , direct_columnar_conversion_(false) + , thread_idx_(thread_idx) { + auto timer = DEBUG_TIMER(__func__); + + if (target_type->isVarLen()) { + throw ColumnarConversionNotSupported(); + } + column_buffers_ = std::move(one_col_buffer); +} + std::unique_ptr ColumnarResults::mergeResults( std::shared_ptr row_set_mem_owner, const std::vector>& sub_results) { diff --git a/omniscidb/ResultSetRegistry/ColumnarResults.h b/omniscidb/ResultSetRegistry/ColumnarResults.h index 746a9bad11..ced7ac30e9 100644 --- a/omniscidb/ResultSetRegistry/ColumnarResults.h +++ b/omniscidb/ResultSetRegistry/ColumnarResults.h @@ -73,6 +73,11 @@ class ColumnarResults { const hdk::ir::Type* target_type, const size_t thread_idx); + ColumnarResults(const std::vector one_col_buffer, + const size_t num_rows, + const hdk::ir::Type* target_type, + const size_t thread_idx); + static std::unique_ptr mergeResults( const std::shared_ptr row_set_mem_owner, const std::vector>& sub_results);