From f363014879bdd637dd5b26fffbbb40285f70446a Mon Sep 17 00:00:00 2001 From: Dmitrii Makarenko Date: Tue, 1 Aug 2023 06:58:50 +0000 Subject: [PATCH] [Join] Inline and parallelize tbb in getAllTableColumnFragments. This commit refactors and simplifies method `getAllTableColumnFragments`. Also some parallelization added. Partially resolves: #574 Signed-off-by: Dmitrii Makarenko --- omniscidb/QueryEngine/ColumnFetcher.cpp | 126 +++++++++++++----- .../ResultSetRegistry/ColumnarResults.cpp | 22 --- omniscidb/ResultSetRegistry/ColumnarResults.h | 6 - 3 files changed, 89 insertions(+), 65 deletions(-) diff --git a/omniscidb/QueryEngine/ColumnFetcher.cpp b/omniscidb/QueryEngine/ColumnFetcher.cpp index 20a73868f..62fd71f33 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.cpp +++ b/omniscidb/QueryEngine/ColumnFetcher.cpp @@ -16,8 +16,6 @@ #include "QueryEngine/ColumnFetcher.h" -#include - #include "DataMgr/ArrayNoneEncoder.h" #include "QueryEngine/ErrorHandling.h" #include "QueryEngine/Execute.h" @@ -25,6 +23,9 @@ #include "Shared/likely.h" #include "Shared/sqltypes.h" +#include +#include + namespace { std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel) { @@ -239,6 +240,11 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( int db_id = col_info->db_id; int table_id = col_info->table_id; int col_id = col_info->column_id; + + // Array type passed to getAllTableColumnFragments. Should be handled in + // linearization. + CHECK(!col_info->type->isString() && !col_info->type->isArray()); + const auto fragments_it = all_tables_fragments.find({db_id, table_id}); CHECK(fragments_it != all_tables_fragments.end()); const auto fragments = fragments_it->second; @@ -248,7 +254,6 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( const InputDescriptor table_desc(db_id, table_id, int(0)); { std::lock_guard columnar_conversion_guard(columnar_fetch_mutex_); - auto col_token = data_provider_->getZeroCopyColumnData(*col_info); if (col_token != nullptr) { size_t num_rows = col_token->getSize() / col_token->getType()->size(); @@ -262,44 +267,91 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( } auto column_it = columnarized_scan_table_cache_.find({table_id, col_id}); - if (column_it == columnarized_scan_table_cache_.end()) { - for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { - if (executor_->getConfig() - .exec.interrupt.enable_non_kernel_time_query_interrupt && - executor_->checkNonKernelTimeInterrupted()) { - throw QueryExecutionError(Executor::ERR_INTERRUPTED); - } - std::list> chunk_holder; - std::list chunk_iter_holder; - const auto& fragment = (*fragments)[frag_id]; - if (fragment.isEmptyPhysicalFragment()) { - continue; - } - auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id); - CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); - auto col_buffer = getOneTableColumnFragment(col_info, - static_cast(frag_id), - all_tables_fragments, - chunk_holder, - chunk_iter_holder, - Data_Namespace::CPU_LEVEL, - int(0), - device_allocator); - column_frags.push_back( - std::make_unique(executor_->row_set_mem_owner_, - col_buffer, - fragment.getNumTuples(), - chunk_meta_it->second->type(), - thread_idx)); - } - auto merged_results = - ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags); + if (column_it != columnarized_scan_table_cache_.end()) { + table_column = column_it->second.get(); + return ColumnFetcher::transferColumnIfNeeded( + table_column, 0, memory_level, device_id, device_allocator); + } + + if (executor_->getConfig().exec.interrupt.enable_non_kernel_time_query_interrupt && + executor_->checkNonKernelTimeInterrupted()) { + throw QueryExecutionError(Executor::ERR_INTERRUPTED); + } + + size_t total_row_count = 0; + for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { + const auto& fragment = (*fragments)[frag_id]; + const auto rows_in_frag = fragment.getNumTuples(); + total_row_count += rows_in_frag; + } + + if (total_row_count == 0) { + std::unique_ptr merged_results(nullptr); + table_column = merged_results.get(); columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id), std::move(merged_results)); - } else { - table_column = column_it->second.get(); + + return ColumnFetcher::transferColumnIfNeeded( + table_column, 0, memory_level, device_id, device_allocator); + } + + const auto type_width = col_info->type->size(); + auto write_ptr = + executor_->row_set_mem_owner_->allocate(type_width * total_row_count); + std::vector> write_ptrs; + std::vector valid_fragments; + for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { + const auto& fragment = (*fragments)[frag_id]; + if (fragment.isEmptyPhysicalFragment()) { + continue; + } + CHECK_EQ(type_width, fragment.getChunkMetadataMap().at(col_id)->type()->size()); + write_ptrs.push_back({write_ptr, fragment.getNumTuples() * type_width}); + write_ptr += fragment.getNumTuples() * type_width; + valid_fragments.push_back(frag_id); } + + CHECK(!write_ptrs.empty()); + size_t valid_frag_count = valid_fragments.size(); + tbb::parallel_for( + tbb::blocked_range(0, valid_frag_count), + [&](const tbb::blocked_range& frag_ids) { + for (size_t v_frag_id = frag_ids.begin(); v_frag_id < frag_ids.end(); + ++v_frag_id) { + std::list> chunk_holder; + std::list chunk_iter_holder; + size_t frag_id = valid_fragments[v_frag_id]; + const auto& fragment = (*fragments)[frag_id]; + auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id); + CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); + std::shared_ptr chunk; + { + ChunkKey chunk_key{ + db_id, fragment.physicalTableId, col_id, fragment.fragmentId}; + chunk = data_provider_->getChunk(col_info, + chunk_key, + Data_Namespace::CPU_LEVEL, + 0, + chunk_meta_it->second->numBytes(), + chunk_meta_it->second->numElements()); + std::lock_guard chunk_list_lock(chunk_list_mutex_); + chunk_holder.push_back(chunk); + } + auto ab = chunk->getBuffer(); + CHECK(ab->getMemoryPtr()); + int8_t* col_buffer = + ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter + memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second); + } + }); + + std::unique_ptr merged_results(new ColumnarResults( + {write_ptrs[0].first}, total_row_count, col_info->type, thread_idx)); + + table_column = merged_results.get(); + columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id), + std::move(merged_results)); } return ColumnFetcher::transferColumnIfNeeded( table_column, 0, memory_level, device_id, device_allocator); diff --git a/omniscidb/ResultSetRegistry/ColumnarResults.cpp b/omniscidb/ResultSetRegistry/ColumnarResults.cpp index d562aa186..a22924e85 100644 --- a/omniscidb/ResultSetRegistry/ColumnarResults.cpp +++ b/omniscidb/ResultSetRegistry/ColumnarResults.cpp @@ -122,28 +122,6 @@ ColumnarResults::ColumnarResults(std::shared_ptr row_set_mem_ } } -ColumnarResults::ColumnarResults(std::shared_ptr row_set_mem_owner, - const int8_t* one_col_buffer, - const size_t num_rows, - const hdk::ir::Type* target_type, - const size_t thread_idx) - : column_buffers_(1) - , num_rows_(num_rows) - , target_types_{target_type} - , parallel_conversion_(false) - , direct_columnar_conversion_(false) - , thread_idx_(thread_idx) { - auto timer = DEBUG_TIMER(__func__); - - if (target_type->isVarLen()) { - throw ColumnarConversionNotSupported(); - } - const auto buf_size = num_rows * target_type->size(); - column_buffers_[0] = - reinterpret_cast(row_set_mem_owner->allocate(buf_size, thread_idx_)); - memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size); -} - ColumnarResults::ColumnarResults(const std::vector one_col_buffer, const size_t num_rows, const hdk::ir::Type* target_type, diff --git a/omniscidb/ResultSetRegistry/ColumnarResults.h b/omniscidb/ResultSetRegistry/ColumnarResults.h index ced7ac30e..91eb97dc8 100644 --- a/omniscidb/ResultSetRegistry/ColumnarResults.h +++ b/omniscidb/ResultSetRegistry/ColumnarResults.h @@ -67,12 +67,6 @@ class ColumnarResults { const Config& config, const bool is_parallel_execution_enforced = false); - ColumnarResults(const std::shared_ptr row_set_mem_owner, - const int8_t* one_col_buffer, - const size_t num_rows, - const hdk::ir::Type* target_type, - const size_t thread_idx); - ColumnarResults(const std::vector one_col_buffer, const size_t num_rows, const hdk::ir::Type* target_type,