From d33c4275663b117b06425fe20e54aa827af47e3d Mon Sep 17 00:00:00 2001 From: Dmitrii Makarenko Date: Tue, 1 Aug 2023 06:58:50 +0000 Subject: [PATCH] [Join] Remove redundant copies. This commit removes useless copying(memcpy) in `getAllTableColumnFragments`. Also some parallelization added. Resolves: #574 Signed-off-by: Dmitrii Makarenko --- omniscidb/QueryEngine/ColumnFetcher.cpp | 130 +++++++++++++----- .../ResultSetRegistry/ColumnarResults.cpp | 22 --- omniscidb/ResultSetRegistry/ColumnarResults.h | 6 - 3 files changed, 96 insertions(+), 62 deletions(-) diff --git a/omniscidb/QueryEngine/ColumnFetcher.cpp b/omniscidb/QueryEngine/ColumnFetcher.cpp index f95223edf..0469b8c45 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.cpp +++ b/omniscidb/QueryEngine/ColumnFetcher.cpp @@ -16,6 +16,7 @@ #include "QueryEngine/ColumnFetcher.h" +//#include #include #include "DataMgr/ArrayNoneEncoder.h" @@ -239,8 +240,10 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( int db_id = col_info->db_id; int table_id = col_info->table_id; int col_id = col_info->column_id; + const auto fragments_it = all_tables_fragments.find({db_id, table_id}); CHECK(fragments_it != all_tables_fragments.end()); + const auto fragments = fragments_it->second; const auto frag_count = fragments->size(); std::vector> column_frags; @@ -248,7 +251,6 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( const InputDescriptor table_desc(db_id, table_id, int(0)); { std::lock_guard columnar_conversion_guard(columnar_fetch_mutex_); - auto col_token = data_provider_->getZeroCopyColumnData(*col_info); if (col_token != nullptr) { size_t num_rows = col_token->getSize() / col_token->getType()->size(); @@ -262,44 +264,104 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( } auto column_it = columnarized_scan_table_cache_.find({table_id, col_id}); - if (column_it == columnarized_scan_table_cache_.end()) { - for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { - if (executor_->getConfig() - .exec.interrupt.enable_non_kernel_time_query_interrupt && - executor_->checkNonKernelTimeInterrupted()) { - throw QueryExecutionError(Executor::ERR_INTERRUPTED); - } - std::list> chunk_holder; - std::list chunk_iter_holder; - const auto& fragment = (*fragments)[frag_id]; - if (fragment.isEmptyPhysicalFragment()) { - continue; - } - auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id); - CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); - auto col_buffer = getOneTableColumnFragment(col_info, - static_cast(frag_id), - all_tables_fragments, - chunk_holder, - chunk_iter_holder, - Data_Namespace::CPU_LEVEL, - int(0), - device_allocator); - column_frags.push_back( - std::make_unique(executor_->row_set_mem_owner_, - col_buffer, - fragment.getNumTuples(), - chunk_meta_it->second->type(), - thread_idx)); + if (column_it != columnarized_scan_table_cache_.end()) { + table_column = column_it->second.get(); + return ColumnFetcher::transferColumnIfNeeded( + table_column, 0, memory_level, device_id, device_allocator); + } + + size_t total_row_count = 0; + for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { + if (executor_->getConfig().exec.interrupt.enable_non_kernel_time_query_interrupt && + executor_->checkNonKernelTimeInterrupted()) { + throw QueryExecutionError(Executor::ERR_INTERRUPTED); } - auto merged_results = - ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags); + const auto& fragment = (*fragments)[frag_id]; + const auto& rows_in_frag = fragment.getNumTuples(); + total_row_count += rows_in_frag; + } + + const auto& type_width = col_info->type->size(); + auto write_ptr = + executor_->row_set_mem_owner_->allocate(type_width * total_row_count); + std::vector> write_ptrs; + for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { + const auto& fragment = (*fragments)[frag_id]; + if (!fragment.getNumTuples()) { + continue; + } + CHECK_EQ(type_width, fragment.getChunkMetadataMap().at(col_id)->type()->size()); + write_ptrs.push_back({write_ptr, fragment.getNumTuples() * type_width}); + write_ptr += fragment.getNumTuples() * type_width; + } + + if (write_ptrs.empty()) { + std::unique_ptr merged_results(nullptr); + table_column = merged_results.get(); columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id), std::move(merged_results)); - } else { - table_column = column_it->second.get(); + + return ColumnFetcher::transferColumnIfNeeded( + table_column, 0, memory_level, device_id, device_allocator); } + + CHECK_EQ(frag_count, write_ptrs.size()); + for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { + std::list> chunk_holder; + std::list chunk_iter_holder; + const auto& fragment = (*fragments)[frag_id]; + if (fragment.isEmptyPhysicalFragment()) { + continue; + } + auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id); + CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); + std::shared_ptr chunk; + // Fixed length arrays are also included here. + const bool is_varlen = col_info->type->isString() || col_info->type->isArray(); + int8_t* col_buffer; + { + ChunkKey chunk_key{db_id, fragment.physicalTableId, col_id, fragment.fragmentId}; + std::unique_ptr> varlen_chunk_lock; + if (is_varlen) { + varlen_chunk_lock.reset( + new std::lock_guard(varlen_chunk_fetch_mutex_)); + } + chunk = data_provider_->getChunk(col_info, + chunk_key, + Data_Namespace::CPU_LEVEL, + 0, + chunk_meta_it->second->numBytes(), + chunk_meta_it->second->numElements()); + std::lock_guard chunk_list_lock(chunk_list_mutex_); + chunk_holder.push_back(chunk); + } + if (is_varlen) { + CHECK_GT(table_id, 0); + CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); + chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second)); + auto& chunk_iter = chunk_iter_holder.back(); + col_buffer = reinterpret_cast(&chunk_iter); + } else { + auto ab = chunk->getBuffer(); + CHECK(ab->getMemoryPtr()); + col_buffer = ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter + } + memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second); + } + + std::vector raw_write_ptrs; + raw_write_ptrs.reserve(frag_count); + for (uint i = 0; i < frag_count; i++) { + raw_write_ptrs.emplace_back(write_ptrs[i].first); + } + + std::unique_ptr merged_results( + new ColumnarResults(raw_write_ptrs, total_row_count, col_info->type, thread_idx)); + + table_column = merged_results.get(); + columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id), + std::move(merged_results)); } return ColumnFetcher::transferColumnIfNeeded( table_column, 0, memory_level, device_id, device_allocator); diff --git a/omniscidb/ResultSetRegistry/ColumnarResults.cpp b/omniscidb/ResultSetRegistry/ColumnarResults.cpp index d562aa186..a22924e85 100644 --- a/omniscidb/ResultSetRegistry/ColumnarResults.cpp +++ b/omniscidb/ResultSetRegistry/ColumnarResults.cpp @@ -122,28 +122,6 @@ ColumnarResults::ColumnarResults(std::shared_ptr row_set_mem_ } } -ColumnarResults::ColumnarResults(std::shared_ptr row_set_mem_owner, - const int8_t* one_col_buffer, - const size_t num_rows, - const hdk::ir::Type* target_type, - const size_t thread_idx) - : column_buffers_(1) - , num_rows_(num_rows) - , target_types_{target_type} - , parallel_conversion_(false) - , direct_columnar_conversion_(false) - , thread_idx_(thread_idx) { - auto timer = DEBUG_TIMER(__func__); - - if (target_type->isVarLen()) { - throw ColumnarConversionNotSupported(); - } - const auto buf_size = num_rows * target_type->size(); - column_buffers_[0] = - reinterpret_cast(row_set_mem_owner->allocate(buf_size, thread_idx_)); - memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size); -} - ColumnarResults::ColumnarResults(const std::vector one_col_buffer, const size_t num_rows, const hdk::ir::Type* target_type, diff --git a/omniscidb/ResultSetRegistry/ColumnarResults.h b/omniscidb/ResultSetRegistry/ColumnarResults.h index ced7ac30e..91eb97dc8 100644 --- a/omniscidb/ResultSetRegistry/ColumnarResults.h +++ b/omniscidb/ResultSetRegistry/ColumnarResults.h @@ -67,12 +67,6 @@ class ColumnarResults { const Config& config, const bool is_parallel_execution_enforced = false); - ColumnarResults(const std::shared_ptr row_set_mem_owner, - const int8_t* one_col_buffer, - const size_t num_rows, - const hdk::ir::Type* target_type, - const size_t thread_idx); - ColumnarResults(const std::vector one_col_buffer, const size_t num_rows, const hdk::ir::Type* target_type,