Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Join perf analysis #579

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion omniscidb/ArrowStorage/ArrowStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,14 @@ size_t columnIndex(int col_id) {
void ArrowStorage::fetchBuffer(const ChunkKey& key,
Data_Namespace::AbstractBuffer* dest,
const size_t num_bytes) {
auto timer = DEBUG_TIMER("ArrowStorage Fetch Buffer");
mapd_shared_lock<mapd_shared_mutex> data_lock(data_mutex_);
auto timer2 = DEBUG_TIMER("ArrowStorage Data Lock taken");
CHECK_EQ(key[CHUNK_KEY_DB_IDX], db_id_);
CHECK_EQ(tables_.count(key[CHUNK_KEY_TABLE_IDX]), (size_t)1);
auto& table = *tables_.at(key[CHUNK_KEY_TABLE_IDX]);
mapd_shared_lock<mapd_shared_mutex> table_lock(table.mutex);
auto timer3 = DEBUG_TIMER("ArrowStorage Table Lock taken");
data_lock.unlock();

size_t col_idx = columnIndex(key[CHUNK_KEY_COLUMN_IDX]);
Expand All @@ -107,17 +110,25 @@ void ArrowStorage::fetchBuffer(const ChunkKey& key,
if (!col_type->isVarLen()) {
CHECK_EQ(key.size(), (size_t)4);
size_t elem_size = col_type->size();
fetchFixedLenData(table, frag_idx, col_idx, dest, num_bytes, elem_size);
if (col_type->isString()) {
auto timer4 = DEBUG_TIMER("ArrowStorage Str Fix len Fetch");
fetchFixedLenData(table, frag_idx, col_idx, dest, num_bytes, elem_size);
} else {
auto timer4 = DEBUG_TIMER("ArrowStorage NotStr Fix len Fetch");
fetchFixedLenData(table, frag_idx, col_idx, dest, num_bytes, elem_size);
}
} else {
CHECK_EQ(key.size(), (size_t)5);
if (key[CHUNK_KEY_VARLEN_IDX] == 1) {
if (!dest->hasEncoder()) {
dest->initEncoder(col_type);
}
if (col_type->isString()) {
auto timer4 = DEBUG_TIMER("ArrowStorage Str Var len Fetch");
fetchVarLenData(table, frag_idx, col_idx, dest, num_bytes);
} else {
CHECK(col_type->isVarLenArray());
auto timer4 = DEBUG_TIMER("ArrowStorage NotStr Var len Fetch");
fetchVarLenArrayData(table,
frag_idx,
col_idx,
Expand All @@ -127,6 +138,7 @@ void ArrowStorage::fetchBuffer(const ChunkKey& key,
}
} else {
CHECK_EQ(key[CHUNK_KEY_VARLEN_IDX], 2);
auto timer4 = DEBUG_TIMER("ArrowStorage Off Var len Fetch");
fetchVarLenOffsets(table, frag_idx, col_idx, dest, num_bytes);
}
}
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/Calcite/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ endif()
add_custom_command(
OUTPUT
${CMAKE_BINARY_DIR}/bin/calcite-1.0-SNAPSHOT-jar-with-dependencies.jar
COMMAND ${MVN_PATH_COMMAND} ${MVN_EXECUTABLE} -l ${CMAKE_BINARY_DIR}/mvn_build.log -e clean install -Dmaven.compiler.showDeprecation=true -Dmaven.compiler.showWarnings=true -Domnisci.release.version="${OMNISCI_JAR_RELEASE_VERSION}" -Djava.net.preferIPv4Stack=true -Dmaven.wagon.http.retryHandler.count=3 -DMAPD_LOG_DIR="${CMAKE_BINARY_DIR}"
COMMAND ${MVN_PATH_COMMAND} ${MVN_EXECUTABLE} -e clean install -Dmaven.compiler.showDeprecation=true -Dmaven.compiler.showWarnings=true -Domnisci.release.version="${OMNISCI_JAR_RELEASE_VERSION}" -Djava.net.preferIPv4Stack=true -Dmaven.wagon.http.retryHandler.count=3 -DMAPD_LOG_DIR="${CMAKE_BINARY_DIR}"
COMMAND ${MKDIR_COMMAND}
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/java/calcite/target/calcite-1.0-SNAPSHOT-jar-with-dependencies.jar ${CMAKE_BINARY_DIR}/bin
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/java
Expand Down
5 changes: 5 additions & 0 deletions omniscidb/DataMgr/Chunk/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ std::shared_ptr<Chunk> Chunk::getChunk(ColumnInfoPtr col_info,
const int deviceId,
const size_t numBytes,
const size_t numElems) {
std::ostringstream ss;
ss << __func__ << " getting chunk: " << col_info->toString();
LOG(WARNING) << ss.str();
auto timer = DEBUG_TIMER("ChunkNS getChunk ");
std::shared_ptr<Chunk> chunkp = std::make_shared<Chunk>(Chunk(col_info));
chunkp->getChunkBuffer(data_mgr, key, memoryLevel, deviceId, numBytes, numElems);
LOG(WARNING) << "[Debug Timer] col_info: " << col_info->toString();
return chunkp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ void PersistentStorageMgr::deleteBuffersWithPrefix(const ChunkKey& chunk_key_pre

AbstractBuffer* PersistentStorageMgr::getBuffer(const ChunkKey& chunk_key,
const size_t num_bytes) {
std::ostringstream ss;
ss << __func__ << " PersistentStrg: " << chunk_key.size() << "-" << chunk_key[0]
<< " b: " << num_bytes;
LOG(WARNING) << ss.str();
auto timer = DEBUG_TIMER("PersistentStrg getBuffer");
return getStorageMgrForTableKey(chunk_key)->getBuffer(chunk_key, num_bytes);
}

Expand Down
84 changes: 83 additions & 1 deletion omniscidb/QueryEngine/CardinalityEstimator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@
#include "ExpressionRewrite.h"
#include "RelAlgExecutor.h"

namespace {

class InputColumnsCollector
: public hdk::ir::ExprCollector<std::unordered_set<InputColDescriptor>,
InputColumnsCollector> {
protected:
void visitColumnVar(const hdk::ir::ColumnVar* col_var) override {
result_.insert(InputColDescriptor(col_var->columnInfo(), 0));
}
};

} // namespace

size_t RelAlgExecutor::getNDVEstimation(const WorkUnit& work_unit,
const int64_t range,
const bool is_agg,
Expand Down Expand Up @@ -101,8 +114,77 @@ RelAlgExecutionUnit create_ndv_execution_unit(const RelAlgExecutionUnit& ra_exe_
RelAlgExecutionUnit create_count_all_execution_unit(
const RelAlgExecutionUnit& ra_exe_unit,
hdk::ir::ExprPtr replacement_target) {
InputColumnsCollector input_columns_collector;
std::list<std::shared_ptr<const InputColDescriptor>> join_input_col_descs;
std::stringstream os{};
os << "\n[Count all] Join Quals: ";

for (const auto& join_condition : ra_exe_unit.join_quals) {
// const auto& join_condition = ra_exe_unit.join_quals[i];
os << "\t" << ::toString(join_condition.type);
for (const auto& q : join_condition.quals) {
if (!q)
break;
LOG(ERROR) << "jq visit";
input_columns_collector.visit(q.get());
os << q->toString() << ", ";
}
}
LOG(ERROR) << "join quals: " << os.str();

for (const auto& q : ra_exe_unit.quals) {
if (!q)
break;
LOG(ERROR) << " q visit";
input_columns_collector.visit(q.get());
os << q->toString() << ", ";
}
LOG(ERROR) << "quals: " << os.str();

for (const auto& q : ra_exe_unit.simple_quals) {
if (!q)
break;
LOG(ERROR) << "sq visit";
input_columns_collector.visit(q.get());
os << q->toString() << ", ";
}
LOG(ERROR) << "simple quals: " << os.str();

for (const auto& q : ra_exe_unit.groupby_exprs) {
if (!q)
break;
LOG(ERROR) << "gb visit";
input_columns_collector.visit(q.get());
os << q->toString() << ", ";
}
LOG(ERROR) << "groupby quals: " << os.str();

auto& input_column_descriptors = input_columns_collector.result();
for (auto& col_var : input_column_descriptors) {
LOG(ERROR) << "col_vars: " << col_var;
for (auto& icol : ra_exe_unit.input_col_descs) {
if (icol->getColId() == col_var.getColId() &&
icol->getTableId() == col_var.getTableId()) {
join_input_col_descs.emplace_back(icol);
}
}
}

if (join_input_col_descs.empty()) {
join_input_col_descs.insert(join_input_col_descs.end(),
ra_exe_unit.input_col_descs.begin(),
ra_exe_unit.input_col_descs.end());
}

std::stringstream js{};
js << "\n\t[Only Join] Table/Col/Levels: ";
for (const auto& input_col_desc : join_input_col_descs) {
js << "(" << input_col_desc->getTableId() << ", " << input_col_desc->getColId()
<< ", " << input_col_desc->getNestLevel() << ") ";
}
LOG(ERROR) << "join in cols: " << js.str();
return {ra_exe_unit.input_descs,
ra_exe_unit.input_col_descs,
join_input_col_descs,
ra_exe_unit.simple_quals,
ra_exe_unit.quals,
ra_exe_unit.join_quals,
Expand Down
31 changes: 29 additions & 2 deletions omniscidb/QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "QueryEngine/ColumnFetcher.h"

#include <tbb/parallel_for.h>
#include <memory>

#include "DataMgr/ArrayNoneEncoder.h"
Expand Down Expand Up @@ -172,7 +173,10 @@ const int8_t* ColumnFetcher::getOneTableColumnFragment(
std::list<ChunkIter>& chunk_iter_holder,
const Data_Namespace::MemoryLevel memory_level,
const int device_id,
DeviceAllocator* allocator) const {
DeviceAllocator* allocator,
const size_t thread_idx) const {
auto timer = DEBUG_TIMER(__func__);
INJECT_TIMER(getOneTableColumnFragment);
int db_id = col_info->db_id;
int table_id = col_info->table_id;
int col_id = col_info->column_id;
Expand All @@ -184,6 +188,10 @@ const int8_t* ColumnFetcher::getOneTableColumnFragment(
if (fragment.isEmptyPhysicalFragment()) {
return nullptr;
}
LOG(ERROR) << __func__ << " lock taken, execution started parent thread: " << thread_idx
<< "\n logger curr tid: " << logger::thread_id()
<< "\n scan_table tabid: " << table_id << " colid: " << col_id
<< "\n col_info: " << col_info->toString();
std::shared_ptr<Chunk_NS::Chunk> chunk;
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
Expand All @@ -196,6 +204,7 @@ const int8_t* ColumnFetcher::getOneTableColumnFragment(
if (is_varlen) {
varlen_chunk_lock.reset(new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
}
LOG(ERROR) << "DataProvider " << typeid(*data_provider_).name() << " getChunk";
chunk = data_provider_->getChunk(
col_info,
chunk_key,
Expand All @@ -212,8 +221,10 @@ const int8_t* ColumnFetcher::getOneTableColumnFragment(
chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
auto& chunk_iter = chunk_iter_holder.back();
if (memory_level == Data_Namespace::CPU_LEVEL) {
LOG(ERROR) << "[Vrl] cast only";
return reinterpret_cast<int8_t*>(&chunk_iter);
} else {
LOG(ERROR) << "[Vrl] chunk getChunk";
auto ab = chunk->getBuffer();
auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
row_set_mem_owner->addVarlenInputBuffer(ab);
Expand All @@ -225,6 +236,7 @@ const int8_t* ColumnFetcher::getOneTableColumnFragment(
return chunk_iter_gpu;
}
} else {
LOG(ERROR) << "[No ] chunk getChunk";
auto ab = chunk->getBuffer();
CHECK(ab->getMemoryPtr());
return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
Expand All @@ -238,6 +250,8 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
const int device_id,
DeviceAllocator* device_allocator,
const size_t thread_idx) const {
auto timer = DEBUG_TIMER(__func__);
INJECT_TIMER(getAllTableColumnFragments);
int db_id = col_info->db_id;
int table_id = col_info->table_id;
int col_id = col_info->column_id;
Expand All @@ -246,10 +260,17 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
const auto fragments = fragments_it->second;
const auto frag_count = fragments->size();
std::vector<std::unique_ptr<ColumnarResults>> column_frags;
std::vector<ColumnarDataRefence> column_frags_raw;

const ColumnarResults* table_column = nullptr;
const InputDescriptor table_desc(db_id, table_id, int(0));
{
std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
auto timer = DEBUG_TIMER("lock taken, execution started");
LOG(ERROR) << __func__
<< " lock taken, execution started parent thread: " << thread_idx
<< "\n logger curr tid: " << logger::thread_id()
<< "\n scan_table tabid: " << table_id << " colid: " << col_id;
auto column_it = columnarized_scan_table_cache_.find({table_id, col_id});
if (column_it == columnarized_scan_table_cache_.end()) {
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
Expand All @@ -273,7 +294,10 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
chunk_iter_holder,
Data_Namespace::CPU_LEVEL,
int(0),
device_allocator);
device_allocator,
thread_idx);
// column_frags_raw.push_back(
// {col_buffer, fragment.getNumTuples(), chunk_meta_it->second->type()});
column_frags.push_back(
std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
col_buffer,
Expand All @@ -283,6 +307,9 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
}
auto merged_results =
ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
// auto merged_results =
// ColumnarResults::mergeResults(executor_->row_set_mem_owner_,
// column_frags_raw);
table_column = merged_results.get();
columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id),
std::move(merged_results));
Expand Down
3 changes: 2 additions & 1 deletion omniscidb/QueryEngine/ColumnFetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class ColumnFetcher {
std::list<ChunkIter>& chunk_iter_holder,
const Data_Namespace::MemoryLevel memory_level,
const int device_id,
DeviceAllocator* device_allocator) const;
DeviceAllocator* device_allocator,
const size_t thread_idx = 0) const;

const int8_t* getAllTableColumnFragments(
ColumnInfoPtr col_info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ void QueryFragmentDescriptor::buildFragmentPerKernelForTable(
}

ExecutionKernelDescriptor execution_kernel_desc{
device_id, {}, fragment.getNumTuples()};
.device_id = device_id,
.fragments = {},
.outer_tuple_count = fragment.getNumTuples()};
if (table_desc_offset) {
const auto frag_ids =
executor->getTableFragmentIndices(ra_exe_unit,
Expand Down
9 changes: 9 additions & 0 deletions omniscidb/QueryEngine/Execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2329,6 +2329,7 @@ void Executor::executeWorkUnitPerFragment(
for (auto fragment_index : fragment_indexes) {
// We may want to consider in the future allowing this to execute on devices other
// than CPU
LOG(ERROR) << "kernel create and run with fragment_idx: " << fragment_index;
FragmentsList fragments_list{{db_id, table_id, {fragment_index}}};
ExecutionKernel kernel(ra_exe_unit,
co.device_type,
Expand Down Expand Up @@ -2496,6 +2497,7 @@ ResultSetPtr build_row_for_empty_input(
const ExecutorDeviceType device_type) {
std::vector<hdk::ir::ExprPtr> target_exprs_owned_copies;
std::vector<const hdk::ir::Expr*> target_exprs;
LOG(INFO) << "Building row for empty input.";
for (const auto target_expr : target_exprs_in) {
auto agg_expr = target_expr->as<hdk::ir::AggExpr>();
CHECK(agg_expr);
Expand Down Expand Up @@ -2990,6 +2992,8 @@ Executor::getRowCountAndOffsetForAllFrags(
const CartesianProduct<std::vector<std::vector<size_t>>>& frag_ids_crossjoin,
const std::vector<InputDescriptor>& input_descs,
const std::map<TableRef, const TableFragments*>& all_tables_fragments) {
auto timer = DEBUG_TIMER(__func__);
INJECT_TIMER(getRowCountAndOffsetForAllFrags);
std::vector<std::vector<int64_t>> all_num_rows;
std::vector<std::vector<uint64_t>> all_frag_offsets;
const auto tab_id_to_frag_offsets =
Expand Down Expand Up @@ -3048,6 +3052,8 @@ bool Executor::needFetchAllFragments(const InputColDescriptor& inner_col_desc,
CHECK_LT(static_cast<size_t>(nest_level), selected_fragments.size());
CHECK_EQ(table_id, selected_fragments[nest_level].table_id);
const auto& fragments = selected_fragments[nest_level].fragment_ids;
LOG(WARNING) << "needFetchAllFragments fragments size: " << fragments.size()
<< " table_id: " << table_id;
return fragments.size() > 1;
}

Expand Down Expand Up @@ -3128,6 +3134,7 @@ FetchResult Executor::fetchChunks(
memory_level_for_column = Data_Namespace::CPU_LEVEL;
}
if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) {
VLOG(1) << "Need to fetch all frags.";
// determine if we need special treatment to linearlize multi-frag table
// i.e., a column that is classified as varlen type, i.e., array
// for now, we can support more types in this way
Expand Down Expand Up @@ -3352,6 +3359,8 @@ void Executor::buildSelectedFragsMapping(
const std::list<std::shared_ptr<const InputColDescriptor>>& col_global_ids,
const FragmentsList& selected_fragments,
const RelAlgExecutionUnit& ra_exe_unit) {
auto timer = DEBUG_TIMER(__func__);
INJECT_TIMER(buildSelectedFragsMapping);
local_col_to_frag_pos.resize(plan_state_->global_to_local_col_ids_.size());
size_t frag_pos{0};
const auto& input_descs = ra_exe_unit.input_descs;
Expand Down
1 change: 1 addition & 0 deletions omniscidb/QueryEngine/JoinHashTable/HashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ int64_t HashJoin::getJoinHashBuffer(const ExecutorDeviceType device_type,
CompositeKeyInfo HashJoin::getCompositeKeyInfo(
const std::vector<InnerOuter>& inner_outer_pairs,
const Executor* executor) {
auto timer = DEBUG_TIMER(__func__);
CHECK(executor);
std::vector<const void*> sd_inner_proxy_per_key;
std::vector<const void*> sd_outer_proxy_per_key;
Expand Down
Loading