Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
[Join] Parallelization.
Browse files Browse the repository at this point in the history
This commit adds parallelization.

Resolves: #574

Signed-off-by: Dmitrii Makarenko <[email protected]>
  • Loading branch information
Devjiu committed Aug 24, 2023
1 parent bbc1bb1 commit b9af66e
Showing 1 changed file with 50 additions and 44 deletions.
94 changes: 50 additions & 44 deletions omniscidb/QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include "QueryEngine/ColumnFetcher.h"

//#include <tbb/parallel_for.h>
#include <tbb/parallel_for.h>
#include <memory>

#include "DataMgr/ArrayNoneEncoder.h"
Expand Down Expand Up @@ -284,7 +284,7 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
const auto& type_width = col_info->type->size();
auto write_ptr =
executor_->row_set_mem_owner_->allocate(type_width * total_row_count);
std::vector<std::pair<int8_t*, size_t>> write_ptrs;
tbb::concurrent_vector<std::pair<int8_t*, size_t>> write_ptrs;
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
const auto& fragment = (*fragments)[frag_id];
if (!fragment.getNumTuples()) {
Expand All @@ -307,48 +307,54 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
}

CHECK_EQ(frag_count, write_ptrs.size());
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
std::list<ChunkIter> chunk_iter_holder;
const auto& fragment = (*fragments)[frag_id];
if (fragment.isEmptyPhysicalFragment()) {
continue;
}
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
std::shared_ptr<Chunk_NS::Chunk> chunk;
// Fixed length arrays are also included here.
const bool is_varlen = col_info->type->isString() || col_info->type->isArray();
int8_t* col_buffer;
{
ChunkKey chunk_key{db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
if (is_varlen) {
varlen_chunk_lock.reset(
new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
}
chunk = data_provider_->getChunk(col_info,
chunk_key,
Data_Namespace::CPU_LEVEL,
0,
chunk_meta_it->second->numBytes(),
chunk_meta_it->second->numElements());
std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
chunk_holder.push_back(chunk);
}
if (is_varlen) {
CHECK_GT(table_id, 0);
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
auto& chunk_iter = chunk_iter_holder.back();
col_buffer = reinterpret_cast<int8_t*>(&chunk_iter);
} else {
auto ab = chunk->getBuffer();
CHECK(ab->getMemoryPtr());
col_buffer = ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
}
memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second);
}
tbb::parallel_for(
tbb::blocked_range<size_t>(0, frag_count),
[&](const tbb::blocked_range<size_t>& frag_ids) {
for (size_t frag_id = frag_ids.begin(); frag_id < frag_ids.end(); ++frag_id) {
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
std::list<ChunkIter> chunk_iter_holder;
const auto& fragment = (*fragments)[frag_id];
if (fragment.isEmptyPhysicalFragment()) {
continue;
}
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
std::shared_ptr<Chunk_NS::Chunk> chunk;
// Fixed length arrays are also included here.
const bool is_varlen =
col_info->type->isString() || col_info->type->isArray();
int8_t* col_buffer;
{
ChunkKey chunk_key{
db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
if (is_varlen) {
varlen_chunk_lock.reset(
new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
}
chunk = data_provider_->getChunk(col_info,
chunk_key,
Data_Namespace::CPU_LEVEL,
0,
chunk_meta_it->second->numBytes(),
chunk_meta_it->second->numElements());
std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
chunk_holder.push_back(chunk);
}
if (is_varlen) {
CHECK_GT(table_id, 0);
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
auto& chunk_iter = chunk_iter_holder.back();
col_buffer = reinterpret_cast<int8_t*>(&chunk_iter);
} else {
auto ab = chunk->getBuffer();
CHECK(ab->getMemoryPtr());
col_buffer = ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
}
memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second);
}
});

std::vector<int8_t*> raw_write_ptrs;
raw_write_ptrs.reserve(frag_count);
Expand Down

0 comments on commit b9af66e

Please sign in to comment.