Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
[Bench] Reduce input cols number for fetch
Browse files Browse the repository at this point in the history
To estimate output size of result query to allocate buffer we are
running `count*` before some actual queries. This estimation requires
only query body argumnets without `select` arguments, so this commit
changes input_cols for `count*` query.

Resolves: #574

Signed-off-by: Dmitrii Makarenko <[email protected]>
  • Loading branch information
Devjiu committed Aug 1, 2023
1 parent a59f799 commit 22e4b41
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 113 deletions.
1 change: 1 addition & 0 deletions omniscidb/QueryEngine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ set(query_engine_source_files
QueryExecutionSequence.cpp
QueryMemoryInitializer.cpp
RelAlgDagBuilder.cpp
RelAlgExecutionUnit.cpp
RelAlgExecutor.cpp
RelAlgTranslator.cpp
RelAlgOptimizer.cpp
Expand Down
3 changes: 2 additions & 1 deletion omniscidb/QueryEngine/CardinalityEstimator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ RelAlgExecutionUnit create_ndv_execution_unit(const RelAlgExecutionUnit& ra_exe_

RelAlgExecutionUnit create_count_all_execution_unit(
const RelAlgExecutionUnit& ra_exe_unit,
const SchemaProvider* schema_provider,
hdk::ir::ExprPtr replacement_target) {
return {ra_exe_unit.input_descs,
ra_exe_unit.input_col_descs,
schema_provider,
ra_exe_unit.simple_quals,
ra_exe_unit.quals,
ra_exe_unit.join_quals,
Expand Down
1 change: 1 addition & 0 deletions omniscidb/QueryEngine/CardinalityEstimator.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ RelAlgExecutionUnit create_ndv_execution_unit(const RelAlgExecutionUnit& ra_exe_

RelAlgExecutionUnit create_count_all_execution_unit(
const RelAlgExecutionUnit& ra_exe_unit,
const SchemaProvider* schema_provider,
hdk::ir::ExprPtr replacement_target);

ResultSetPtr reduce_estimator_results(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ void QueryFragmentDescriptor::buildFragmentPerKernelForTable(
}

ExecutionKernelDescriptor execution_kernel_desc{
device_id, {}, fragment.getNumTuples()};
/*device_id*/ device_id,
/*fragments*/ {},
/*outer_tuple_count*/ fragment.getNumTuples()};
if (table_desc_offset) {
const auto frag_ids =
executor->getTableFragmentIndices(ra_exe_unit,
Expand Down
15 changes: 4 additions & 11 deletions omniscidb/QueryEngine/JoinFilterPushDown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "IR/ExprCollector.h"
#include "IR/ExprRewriter.h"
#include "RelAlgExecutor.h"
#include "Visitors/UsedInputsCollector.h"

namespace {

Expand All @@ -27,15 +28,6 @@ class BindFilterToOutermostVisitor : public hdk::ir::ExprRewriter {
}
};

class InputColumnsCollector
: public hdk::ir::ExprCollector<std::unordered_set<InputColDescriptor>,
InputColumnsCollector> {
protected:
void visitColumnVar(const hdk::ir::ColumnVar* col_var) override {
result_.insert(InputColDescriptor(col_var->columnInfo(), 0));
}
};

} // namespace

/**
Expand All @@ -49,7 +41,7 @@ FilterSelectivity RelAlgExecutor::getFilterSelectivity(
const std::vector<hdk::ir::ExprPtr>& filter_expressions,
const CompilationOptions& co,
const ExecutionOptions& eo) {
InputColumnsCollector input_columns_collector;
UsedInputsCollector input_columns_collector;
std::list<hdk::ir::ExprPtr> quals;
BindFilterToOutermostVisitor bind_filter_to_outermost;
for (const auto& filter_expr : filter_expressions) {
Expand All @@ -59,7 +51,8 @@ FilterSelectivity RelAlgExecutor::getFilterSelectivity(
auto& input_column_descriptors = input_columns_collector.result();
std::vector<InputDescriptor> input_descs;
std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
for (const auto& input_col_desc : input_column_descriptors) {
for (const auto& input_col_var : input_column_descriptors) {
auto input_col_desc = InputColDescriptor(input_col_var.columnInfo(), 0);
if (input_descs.empty()) {
input_descs.push_back(input_col_desc.getScanDesc());
} else {
Expand Down
75 changes: 75 additions & 0 deletions omniscidb/QueryEngine/RelAlgExecutionUnit.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (C) 2023 Intel Corporation
* Copyright 2017 MapD Technologies, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/

#include "RelAlgExecutionUnit.h"
#include "Visitors/UsedInputsCollector.h"

void RelAlgExecutionUnit::calcInputColDescs(const SchemaProvider* schema_provider) {
// Scan all currently used expressions to determine used columns.
UsedInputsCollector collector;
for (auto& expr : simple_quals) {
collector.visit(expr.get());
}
for (auto& expr : quals) {
collector.visit(expr.get());
}
for (auto& expr : groupby_exprs) {
if (expr) {
collector.visit(expr.get());
}
}
for (auto& join_qual : join_quals) {
for (auto& expr : join_qual.quals) {
collector.visit(expr.get());
}
}

for (auto& expr : target_exprs) {
collector.visit(expr);
}

if (partition_offsets_col) {
collector.visit(partition_offsets_col.get());
}

std::vector<std::shared_ptr<const InputColDescriptor>> col_descs;
for (auto& col_var : collector.result()) {
col_descs.push_back(std::make_shared<const InputColDescriptor>(col_var.columnInfo(),
col_var.rteIdx()));
}

// For UNION we only have column variables for a single table used
// in target expressions but should mark all columns as used.
if (union_all && !col_descs.empty()) {
CHECK_EQ(col_descs.front()->getNestLevel(), 0);
CHECK_EQ(input_descs.size(), (size_t)2);
TableRef processed_table_ref(col_descs.front()->getDatabaseId(),
col_descs.front()->getTableId());
for (auto tdesc : input_descs) {
if (tdesc.getTableRef() != processed_table_ref) {
auto columns = schema_provider->listColumns(tdesc.getTableRef());
for (auto& col_info : columns) {
if (!col_info->is_rowid) {
col_descs.push_back(std::make_shared<InputColDescriptor>(col_info, 0));
}
}
}
}
}

std::sort(
col_descs.begin(),
col_descs.end(),
[](std::shared_ptr<const InputColDescriptor> const& lhs,
std::shared_ptr<const InputColDescriptor> const& rhs) {
return std::make_tuple(lhs->getNestLevel(), lhs->getColId(), lhs->getTableId()) <
std::make_tuple(rhs->getNestLevel(), rhs->getColId(), rhs->getTableId());
});

input_col_descs.clear();
input_col_descs.insert(input_col_descs.end(), col_descs.begin(), col_descs.end());
}
104 changes: 94 additions & 10 deletions omniscidb/QueryEngine/RelAlgExecutionUnit.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*
/**
* Copyright 2023 Intel Corporation
* Copyright 2017 MapD Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -23,8 +24,7 @@
* Copyright (c) 2016 MapD Technologies, Inc. All rights reserved.
**/

#ifndef QUERYENGINE_RELALGEXECUTIONUNIT_H
#define QUERYENGINE_RELALGEXECUTIONUNIT_H
#pragma once

#include "CostModel/CostModel.h"
#include "Descriptors/InputDescriptors.h"
Expand Down Expand Up @@ -64,14 +64,15 @@ using QueryPlanHash = size_t;
// used to detect a correct cached hashtable
struct HashTableBuildDag {
public:
HashTableBuildDag(const JoinColumnsInfo& in_inner_cols_info,
const JoinColumnsInfo& in_outer_cols_info,
const QueryPlan& in_inner_cols_access_path,
const QueryPlan& in_outer_cols_access_path)
explicit HashTableBuildDag(const JoinColumnsInfo& in_inner_cols_info,
const JoinColumnsInfo& in_outer_cols_info,
const QueryPlan& in_inner_cols_access_path,
const QueryPlan& in_outer_cols_access_path)
: inner_cols_info(in_inner_cols_info)
, outer_cols_info(in_outer_cols_info)
, inner_cols_access_path(in_inner_cols_access_path)
, outer_cols_access_path(in_outer_cols_access_path) {}

JoinColumnsInfo inner_cols_info;
JoinColumnsInfo outer_cols_info;
QueryPlan inner_cols_access_path;
Expand Down Expand Up @@ -127,7 +128,88 @@ struct JoinCondition {

using JoinQualsPerNestingLevel = std::vector<JoinCondition>;

struct RelAlgExecutionUnit {
class RelAlgExecutionUnit {
public:
RelAlgExecutionUnit(
std::vector<InputDescriptor> input_descs,
std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs,
std::list<hdk::ir::ExprPtr> simple_quals,
std::list<hdk::ir::ExprPtr> quals,
const JoinQualsPerNestingLevel join_quals,
std::list<hdk::ir::ExprPtr> groupby_exprs,
std::vector<const hdk::ir::Expr*> target_exprs,
const std::shared_ptr<hdk::ir::Estimator> estimator,
const SortInfo sort_info,
size_t scan_limit,
QueryPlan query_plan_dag = {EMPTY_QUERY_PLAN},
HashTableBuildDagMap hash_table_build_plan_dag = {},
TableIdToNodeMap table_id_to_node_map = {},
const std::optional<bool> union_all = {},
std::optional<hdk::ir::ShuffleFunction> shuffle_fn = {},
hdk::ir::ExprPtr partition_offsets_col = nullptr,
bool partitioned_aggregation = false,
std::shared_ptr<costmodel::CostModel> cost_model = nullptr,
std::vector<costmodel::AnalyticalTemplate> templs = {})
: input_descs(std::move(input_descs))
, input_col_descs(std::move(input_col_descs))
, simple_quals(std::move(simple_quals))
, quals(std::move(quals))
, join_quals(std::move(join_quals))
, groupby_exprs(std::move(groupby_exprs))
, target_exprs(std::move(target_exprs))
, estimator(std::move(estimator))
, sort_info(std::move(sort_info))
, scan_limit(scan_limit)
, query_plan_dag(std::move(query_plan_dag))
, hash_table_build_plan_dag(std::move(hash_table_build_plan_dag))
, table_id_to_node_map(std::move(table_id_to_node_map))
, union_all(union_all)
, shuffle_fn(std::move(shuffle_fn))
, partition_offsets_col(std::move(partition_offsets_col))
, partitioned_aggregation(partitioned_aggregation)
, cost_model(std::move(cost_model))
, templs(std::move(templs)) {}

RelAlgExecutionUnit(std::vector<InputDescriptor> input_descs,
const SchemaProvider* schema_provider,
std::list<hdk::ir::ExprPtr> simple_quals,
std::list<hdk::ir::ExprPtr> quals,
const JoinQualsPerNestingLevel join_quals,
std::list<hdk::ir::ExprPtr> groupby_exprs,
std::vector<const hdk::ir::Expr*> target_exprs,
const std::shared_ptr<hdk::ir::Estimator> estimator,
const SortInfo sort_info,
size_t scan_limit,
QueryPlan query_plan_dag = {EMPTY_QUERY_PLAN},
HashTableBuildDagMap hash_table_build_plan_dag = {},
TableIdToNodeMap table_id_to_node_map = {},
const std::optional<bool> union_all = {},
std::optional<hdk::ir::ShuffleFunction> shuffle_fn = {},
hdk::ir::ExprPtr partition_offsets_col = nullptr,
bool partitioned_aggregation = false,
std::shared_ptr<costmodel::CostModel> cost_model = nullptr,
std::vector<costmodel::AnalyticalTemplate> templs = {})
: input_descs(std::move(input_descs))
, simple_quals(std::move(simple_quals))
, quals(std::move(quals))
, join_quals(std::move(join_quals))
, groupby_exprs(std::move(groupby_exprs))
, target_exprs(std::move(target_exprs))
, estimator(std::move(estimator))
, sort_info(std::move(sort_info))
, scan_limit(scan_limit)
, query_plan_dag(std::move(query_plan_dag))
, hash_table_build_plan_dag(std::move(hash_table_build_plan_dag))
, table_id_to_node_map(std::move(table_id_to_node_map))
, union_all(union_all)
, shuffle_fn(std::move(shuffle_fn))
, partition_offsets_col(std::move(partition_offsets_col))
, partitioned_aggregation(partitioned_aggregation)
, cost_model(std::move(cost_model))
, templs(std::move(templs)) {
calcInputColDescs(schema_provider);
}

std::vector<InputDescriptor> input_descs;
std::list<std::shared_ptr<const InputColDescriptor>> input_col_descs;
std::list<hdk::ir::ExprPtr> simple_quals;
Expand Down Expand Up @@ -158,9 +240,11 @@ struct RelAlgExecutionUnit {

bool isShuffleCount() const { return shuffle_fn && !partition_offsets_col; }
bool isShuffle() const { return shuffle_fn && partition_offsets_col; }

private:
// Method used for creation input_col_descs from qualifires and expressions
void calcInputColDescs(const SchemaProvider* schema_provider);
};

std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit);
std::string ra_exec_unit_desc_for_caching(const RelAlgExecutionUnit& ra_exe_unit);

#endif // QUERYENGINE_RELALGEXECUTIONUNIT_H
18 changes: 2 additions & 16 deletions omniscidb/QueryEngine/RelAlgExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,20 +473,6 @@ void RelAlgExecutor::handleNop(RaExecutionDesc& ed) {

namespace {

struct ColumnRefHash {
size_t operator()(const hdk::ir::ColumnRef& col_ref) const { return col_ref.hash(); }
};

using ColumnRefSet = std::unordered_set<hdk::ir::ColumnRef, ColumnRefHash>;

class UsedInputsCollector
: public hdk::ir::ExprCollector<ColumnRefSet, UsedInputsCollector> {
protected:
void visitColumnRef(const hdk::ir::ColumnRef* col_ref) override {
result_.insert(*col_ref);
}
};

const hdk::ir::Node* get_data_sink(const hdk::ir::Node* ra_node) {
if (auto join = dynamic_cast<const hdk::ir::Join*>(ra_node)) {
CHECK_EQ(size_t(2), join->inputCount());
Expand Down Expand Up @@ -1526,8 +1512,8 @@ std::optional<size_t> RelAlgExecutor::getFilteredCountAll(const WorkUnit& work_u
count_all_agg = builder.count();
}

const auto count_all_exe_unit =
create_count_all_execution_unit(work_unit.exe_unit, count_all_agg.expr());
const auto count_all_exe_unit = create_count_all_execution_unit(
work_unit.exe_unit, schema_provider_.get(), count_all_agg.expr());
size_t one{1};
hdk::ResultSetTable count_all_result;
try {
Expand Down
27 changes: 27 additions & 0 deletions omniscidb/QueryEngine/Visitors/UsedInputsCollector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2023 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include "IR/ExprCollector.h"

#include <unordered_set>

struct ColumnVarHash {
size_t operator()(const hdk::ir::ColumnVar& col_var) const { return col_var.hash(); }
};

using ColumnVarSet = std::unordered_set<hdk::ir::ColumnVar, ColumnVarHash>;

class UsedInputsCollector
: public hdk::ir::ExprCollector<ColumnVarSet, UsedInputsCollector> {
protected:
void visitColumnRef(const hdk::ir::ColumnRef* col_ref) override { CHECK(false); }

void visitColumnVar(const hdk::ir::ColumnVar* col_var) override {
result_.insert(*col_var);
}
};
Loading

0 comments on commit 22e4b41

Please sign in to comment.