Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
[Join] Remove redundant copies.
Browse files Browse the repository at this point in the history
This commit removes useless copying(memcpy) in `getAllTableColumnFragments`.
Also some parallelization added.

Resolves: #574

Signed-off-by: Dmitrii Makarenko <[email protected]>
  • Loading branch information
Devjiu committed Aug 24, 2023
1 parent 9d026ff commit d33c427
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 62 deletions.
130 changes: 96 additions & 34 deletions omniscidb/QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "QueryEngine/ColumnFetcher.h"

//#include <tbb/parallel_for.h>
#include <memory>

#include "DataMgr/ArrayNoneEncoder.h"
Expand Down Expand Up @@ -239,16 +240,17 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
int db_id = col_info->db_id;
int table_id = col_info->table_id;
int col_id = col_info->column_id;

const auto fragments_it = all_tables_fragments.find({db_id, table_id});
CHECK(fragments_it != all_tables_fragments.end());

const auto fragments = fragments_it->second;
const auto frag_count = fragments->size();
std::vector<std::unique_ptr<ColumnarResults>> column_frags;
const ColumnarResults* table_column = nullptr;
const InputDescriptor table_desc(db_id, table_id, int(0));
{
std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);

auto col_token = data_provider_->getZeroCopyColumnData(*col_info);
if (col_token != nullptr) {
size_t num_rows = col_token->getSize() / col_token->getType()->size();
Expand All @@ -262,44 +264,104 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
}

auto column_it = columnarized_scan_table_cache_.find({table_id, col_id});
if (column_it == columnarized_scan_table_cache_.end()) {
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
if (executor_->getConfig()
.exec.interrupt.enable_non_kernel_time_query_interrupt &&
executor_->checkNonKernelTimeInterrupted()) {
throw QueryExecutionError(Executor::ERR_INTERRUPTED);
}
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
std::list<ChunkIter> chunk_iter_holder;
const auto& fragment = (*fragments)[frag_id];
if (fragment.isEmptyPhysicalFragment()) {
continue;
}
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
auto col_buffer = getOneTableColumnFragment(col_info,
static_cast<int>(frag_id),
all_tables_fragments,
chunk_holder,
chunk_iter_holder,
Data_Namespace::CPU_LEVEL,
int(0),
device_allocator);
column_frags.push_back(
std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
col_buffer,
fragment.getNumTuples(),
chunk_meta_it->second->type(),
thread_idx));
if (column_it != columnarized_scan_table_cache_.end()) {
table_column = column_it->second.get();
return ColumnFetcher::transferColumnIfNeeded(
table_column, 0, memory_level, device_id, device_allocator);
}

size_t total_row_count = 0;
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
if (executor_->getConfig().exec.interrupt.enable_non_kernel_time_query_interrupt &&
executor_->checkNonKernelTimeInterrupted()) {
throw QueryExecutionError(Executor::ERR_INTERRUPTED);
}
auto merged_results =
ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
const auto& fragment = (*fragments)[frag_id];
const auto& rows_in_frag = fragment.getNumTuples();
total_row_count += rows_in_frag;
}

const auto& type_width = col_info->type->size();
auto write_ptr =
executor_->row_set_mem_owner_->allocate(type_width * total_row_count);
std::vector<std::pair<int8_t*, size_t>> write_ptrs;
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
const auto& fragment = (*fragments)[frag_id];
if (!fragment.getNumTuples()) {
continue;
}
CHECK_EQ(type_width, fragment.getChunkMetadataMap().at(col_id)->type()->size());
write_ptrs.push_back({write_ptr, fragment.getNumTuples() * type_width});
write_ptr += fragment.getNumTuples() * type_width;
}

if (write_ptrs.empty()) {
std::unique_ptr<ColumnarResults> merged_results(nullptr);

table_column = merged_results.get();
columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id),
std::move(merged_results));
} else {
table_column = column_it->second.get();

return ColumnFetcher::transferColumnIfNeeded(
table_column, 0, memory_level, device_id, device_allocator);
}

CHECK_EQ(frag_count, write_ptrs.size());
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
std::list<ChunkIter> chunk_iter_holder;
const auto& fragment = (*fragments)[frag_id];
if (fragment.isEmptyPhysicalFragment()) {
continue;
}
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
std::shared_ptr<Chunk_NS::Chunk> chunk;
// Fixed length arrays are also included here.
const bool is_varlen = col_info->type->isString() || col_info->type->isArray();
int8_t* col_buffer;
{
ChunkKey chunk_key{db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
if (is_varlen) {
varlen_chunk_lock.reset(
new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
}
chunk = data_provider_->getChunk(col_info,
chunk_key,
Data_Namespace::CPU_LEVEL,
0,
chunk_meta_it->second->numBytes(),
chunk_meta_it->second->numElements());
std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
chunk_holder.push_back(chunk);
}
if (is_varlen) {
CHECK_GT(table_id, 0);
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
auto& chunk_iter = chunk_iter_holder.back();
col_buffer = reinterpret_cast<int8_t*>(&chunk_iter);
} else {
auto ab = chunk->getBuffer();
CHECK(ab->getMemoryPtr());
col_buffer = ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
}
memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second);
}

std::vector<int8_t*> raw_write_ptrs;
raw_write_ptrs.reserve(frag_count);
for (uint i = 0; i < frag_count; i++) {
raw_write_ptrs.emplace_back(write_ptrs[i].first);
}

std::unique_ptr<ColumnarResults> merged_results(
new ColumnarResults(raw_write_ptrs, total_row_count, col_info->type, thread_idx));

table_column = merged_results.get();
columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id),
std::move(merged_results));
}
return ColumnFetcher::transferColumnIfNeeded(
table_column, 0, memory_level, device_id, device_allocator);
Expand Down
22 changes: 0 additions & 22 deletions omniscidb/ResultSetRegistry/ColumnarResults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,28 +122,6 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
}
}

ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const int8_t* one_col_buffer,
const size_t num_rows,
const hdk::ir::Type* target_type,
const size_t thread_idx)
: column_buffers_(1)
, num_rows_(num_rows)
, target_types_{target_type}
, parallel_conversion_(false)
, direct_columnar_conversion_(false)
, thread_idx_(thread_idx) {
auto timer = DEBUG_TIMER(__func__);

if (target_type->isVarLen()) {
throw ColumnarConversionNotSupported();
}
const auto buf_size = num_rows * target_type->size();
column_buffers_[0] =
reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
}

ColumnarResults::ColumnarResults(const std::vector<int8_t*> one_col_buffer,
const size_t num_rows,
const hdk::ir::Type* target_type,
Expand Down
6 changes: 0 additions & 6 deletions omniscidb/ResultSetRegistry/ColumnarResults.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ class ColumnarResults {
const Config& config,
const bool is_parallel_execution_enforced = false);

ColumnarResults(const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const int8_t* one_col_buffer,
const size_t num_rows,
const hdk::ir::Type* target_type,
const size_t thread_idx);

ColumnarResults(const std::vector<int8_t*> one_col_buffer,
const size_t num_rows,
const hdk::ir::Type* target_type,
Expand Down

0 comments on commit d33c427

Please sign in to comment.