From 2eb556399ebfd21ba201cc5fb9ed7fb688ad0c8e Mon Sep 17 00:00:00 2001 From: Dmitrii Makarenko Date: Tue, 8 Aug 2023 13:40:22 +0000 Subject: [PATCH] [Join] Parallelization. This commit adds parallelization. Resolves: #574 Signed-off-by: Dmitrii Makarenko --- omniscidb/QueryEngine/ColumnFetcher.cpp | 94 +++++++++++++------------ 1 file changed, 50 insertions(+), 44 deletions(-) diff --git a/omniscidb/QueryEngine/ColumnFetcher.cpp b/omniscidb/QueryEngine/ColumnFetcher.cpp index 0469b8c45..dc2a3a3b4 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.cpp +++ b/omniscidb/QueryEngine/ColumnFetcher.cpp @@ -16,7 +16,7 @@ #include "QueryEngine/ColumnFetcher.h" -//#include +#include #include #include "DataMgr/ArrayNoneEncoder.h" @@ -284,7 +284,7 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( const auto& type_width = col_info->type->size(); auto write_ptr = executor_->row_set_mem_owner_->allocate(type_width * total_row_count); - std::vector> write_ptrs; + tbb::concurrent_vector> write_ptrs; for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { const auto& fragment = (*fragments)[frag_id]; if (!fragment.getNumTuples()) { @@ -307,48 +307,54 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( } CHECK_EQ(frag_count, write_ptrs.size()); - for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { - std::list> chunk_holder; - std::list chunk_iter_holder; - const auto& fragment = (*fragments)[frag_id]; - if (fragment.isEmptyPhysicalFragment()) { - continue; - } - auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id); - CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); - std::shared_ptr chunk; - // Fixed length arrays are also included here. - const bool is_varlen = col_info->type->isString() || col_info->type->isArray(); - int8_t* col_buffer; - { - ChunkKey chunk_key{db_id, fragment.physicalTableId, col_id, fragment.fragmentId}; - std::unique_ptr> varlen_chunk_lock; - if (is_varlen) { - varlen_chunk_lock.reset( - new std::lock_guard(varlen_chunk_fetch_mutex_)); - } - chunk = data_provider_->getChunk(col_info, - chunk_key, - Data_Namespace::CPU_LEVEL, - 0, - chunk_meta_it->second->numBytes(), - chunk_meta_it->second->numElements()); - std::lock_guard chunk_list_lock(chunk_list_mutex_); - chunk_holder.push_back(chunk); - } - if (is_varlen) { - CHECK_GT(table_id, 0); - CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); - chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second)); - auto& chunk_iter = chunk_iter_holder.back(); - col_buffer = reinterpret_cast(&chunk_iter); - } else { - auto ab = chunk->getBuffer(); - CHECK(ab->getMemoryPtr()); - col_buffer = ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter - } - memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second); - } + tbb::parallel_for( + tbb::blocked_range(0, frag_count), + [&](const tbb::blocked_range& frag_ids) { + for (size_t frag_id = frag_ids.begin(); frag_id < frag_ids.end(); ++frag_id) { + std::list> chunk_holder; + std::list chunk_iter_holder; + const auto& fragment = (*fragments)[frag_id]; + if (fragment.isEmptyPhysicalFragment()) { + continue; + } + auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id); + CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); + std::shared_ptr chunk; + // Fixed length arrays are also included here. + const bool is_varlen = + col_info->type->isString() || col_info->type->isArray(); + int8_t* col_buffer; + { + ChunkKey chunk_key{ + db_id, fragment.physicalTableId, col_id, fragment.fragmentId}; + std::unique_ptr> varlen_chunk_lock; + if (is_varlen) { + varlen_chunk_lock.reset( + new std::lock_guard(varlen_chunk_fetch_mutex_)); + } + chunk = data_provider_->getChunk(col_info, + chunk_key, + Data_Namespace::CPU_LEVEL, + 0, + chunk_meta_it->second->numBytes(), + chunk_meta_it->second->numElements()); + std::lock_guard chunk_list_lock(chunk_list_mutex_); + chunk_holder.push_back(chunk); + } + if (is_varlen) { + CHECK_GT(table_id, 0); + CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); + chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second)); + auto& chunk_iter = chunk_iter_holder.back(); + col_buffer = reinterpret_cast(&chunk_iter); + } else { + auto ab = chunk->getBuffer(); + CHECK(ab->getMemoryPtr()); + col_buffer = ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter + } + memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second); + } + }); std::vector raw_write_ptrs; raw_write_ptrs.reserve(frag_count);