Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
[Join] InitHashTable optimisation
Browse files Browse the repository at this point in the history
This commit reworks `fill_hash_join_buff_bucketized_cpu` to use tbb and
utilize cpu properly.

Partially resolves: #574

Signed-off-by: Dmitrii Makarenko <[email protected]>
  • Loading branch information
Devjiu committed Sep 13, 2023
1 parent 6fd1eec commit 74c47ff
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 92 deletions.
1 change: 1 addition & 0 deletions omniscidb/QueryEngine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ set(query_engine_source_files
JoinHashTable/HashTable.cpp
JoinHashTable/PerfectJoinHashTable.cpp
JoinHashTable/Runtime/HashJoinRuntime.cpp
JoinHashTable/Runtime/HashJoinRuntimeCpu.cpp
L0Kernel.cpp
LogicalIR.cpp
LLVMFunctionAttributesUtil.cpp
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ JoinColumn ColumnFetcher::makeJoinColumn(
data_provider,
column_cache);
if (col_buff != nullptr) {
join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count, num_elems};
num_elems += elem_count;
join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
} else {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ class PerfectJoinHashTableBuilder {
0);

auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
const int thread_count = cpu_threads();
std::vector<std::thread> init_cpu_buff_threads;

{
auto timer_init = DEBUG_TIMER("CPU One-To-One Perfect-Hash: init_hash_join_buff");
Expand All @@ -176,54 +174,36 @@ class PerfectJoinHashTableBuilder {
hash_join_invalid_val);
}
const bool for_semi_join = for_semi_anti_join(join_type);
std::atomic<int> err{0};
{
auto timer_fill =
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized");
for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
&join_column,
str_proxy_translation_map,
thread_idx,
thread_count,
type,
&err,
&col_range,
&is_bitwise_eq,
&for_semi_join,
cpu_hash_table_buff,
hash_entry_info] {
int partial_err = fill_hash_join_buff_bucketized(
cpu_hash_table_buff,
hash_join_invalid_val,
for_semi_join,
join_column,
{static_cast<size_t>(type->size()),
col_range.getIntMin(),
col_range.getIntMax(),
inline_fixed_encoding_null_value(type),
is_bitwise_eq,
col_range.getIntMax() + 1,
get_join_column_type_kind(type)},
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
: 0, // 0 is dummy value
thread_idx,
thread_count,
hash_entry_info.bucket_normalization);
int zero{0};
err.compare_exchange_strong(zero, partial_err);
});
}
for (auto& t : init_cpu_buff_threads) {
t.join();
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized_cpu");

{
JoinColumnTypeInfo type_info{static_cast<size_t>(type->size()),
col_range.getIntMin(),
col_range.getIntMax(),
inline_fixed_encoding_null_value(type),
is_bitwise_eq,
col_range.getIntMax() + 1,
get_join_column_type_kind(type)};

int error = fill_hash_join_buff_bucketized_cpu(
cpu_hash_table_buff,
hash_join_invalid_val,
for_semi_join,
join_column,
type_info,
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
: 0, // 0 is dummy value
hash_entry_info.bucket_normalization);
if (error) {
// Too many hash entries, need to retry with a 1:many table
hash_table_ = nullptr; // clear the hash table buffer
throw NeedsOneToManyHash();
}
}
}
if (err) {
// Too many hash entries, need to retry with a 1:many table
hash_table_ = nullptr; // clear the hash table buffer
throw NeedsOneToManyHash();
}
}

void initOneToManyHashTableOnCpu(
Expand Down
65 changes: 21 additions & 44 deletions omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#endif

#include <tbb/parallel_for.h>
#include <tbb/parallel_reduce.h>

#include <future>
#endif
Expand Down Expand Up @@ -73,35 +74,6 @@ namespace {
* ignore any element ID that is not in the dictionary corresponding to t1_s.x or is
* outside the range of column t1_s.
*/
inline int64_t translate_str_id_to_outer_dict(const int64_t elem,
const int64_t min_elem,
const int64_t max_elem,
const void* sd_inner_proxy,
const void* sd_outer_proxy) {
CHECK(sd_outer_proxy);
const auto sd_inner_dict_proxy =
static_cast<const StringDictionaryProxy*>(sd_inner_proxy);
const auto sd_outer_dict_proxy =
static_cast<const StringDictionaryProxy*>(sd_outer_proxy);
const auto elem_str = sd_inner_dict_proxy->getString(elem);
const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str);
if (outer_id > max_elem || outer_id < min_elem) {
return StringDictionary::INVALID_STR_ID;
}
return outer_id;
}

inline int64_t map_str_id_to_outer_dict(const int64_t inner_elem,
const int64_t min_inner_elem,
const int64_t min_outer_elem,
const int64_t max_outer_elem,
const int32_t* inner_to_outer_translation_map) {
const auto outer_id = inner_to_outer_translation_map[inner_elem - min_inner_elem];
if (outer_id > max_outer_elem || outer_id < min_outer_elem) {
return StringDictionary::INVALID_STR_ID;
}
return outer_id;
}

#if defined(_MSC_VER)
#define DEFAULT_TARGET_ATTRIBUTE
Expand Down Expand Up @@ -263,9 +235,11 @@ DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
#endif
JoinColumnTyped col{&join_column, &type_info};
for (auto item : col.slice(start, step)) {
// LOG(ERROR) << "items: " << item.index;
const size_t index = item.index;
int64_t elem = item.element;
if (elem == type_info.null_val) {
// LOG(ERROR) << "null val";
if (type_info.uses_bw_eq) {
elem = type_info.translated_null_val;
} else {
Expand All @@ -275,11 +249,12 @@ DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
#ifndef __CUDACC__
if (sd_inner_to_outer_translation_map &&
(!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
const auto outer_id = map_str_id_to_outer_dict(elem,
min_inner_elem,
type_info.min_val,
type_info.max_val,
sd_inner_to_outer_translation_map);
const auto outer_id =
cpu_utils::map_str_id_to_outer_dict(elem,
min_inner_elem,
type_info.min_val,
type_info.max_val,
sd_inner_to_outer_translation_map);
if (outer_id == StringDictionary::INVALID_STR_ID) {
continue;
}
Expand Down Expand Up @@ -678,11 +653,12 @@ DEVICE void count_matches_impl(int32_t* count_buff,
#ifndef __CUDACC__
if (sd_inner_to_outer_translation_map &&
(!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
const auto outer_id = map_str_id_to_outer_dict(elem,
min_inner_elem,
type_info.min_val,
type_info.max_val,
sd_inner_to_outer_translation_map);
const auto outer_id =
cpu_utils::map_str_id_to_outer_dict(elem,
min_inner_elem,
type_info.min_val,
type_info.max_val,
sd_inner_to_outer_translation_map);
if (outer_id == StringDictionary::INVALID_STR_ID) {
continue;
}
Expand Down Expand Up @@ -873,11 +849,12 @@ DEVICE void fill_row_ids_impl(int32_t* buff,
#ifndef __CUDACC__
if (sd_inner_to_outer_translation_map &&
(!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
const auto outer_id = map_str_id_to_outer_dict(elem,
min_inner_elem,
type_info.min_val,
type_info.max_val,
sd_inner_to_outer_translation_map);
const auto outer_id =
cpu_utils::map_str_id_to_outer_dict(elem,
min_inner_elem,
type_info.min_val,
type_info.max_val,
sd_inner_to_outer_translation_map);
if (outer_id == StringDictionary::INVALID_STR_ID) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "../../DecodersImpl.h"
#else
#include "../../RuntimeFunctions.h"
#include "HashJoinRuntimeCpu.h"
#endif
#include "../../../QueryEngine/Compiler/CommonRuntimeDefs.h"
#include "../../../Shared/funcannotations.h"
Expand Down Expand Up @@ -66,7 +67,6 @@ void init_hash_join_buff(int32_t* buff,
void init_hash_join_buff_on_device(int32_t* buff,
const int64_t entry_count,
const int32_t invalid_slot_val);

#ifndef __CUDACC__

void init_baseline_hash_join_buff_32(int8_t* hash_join_buff,
Expand Down Expand Up @@ -101,6 +101,7 @@ struct JoinChunk {
const int8_t*
col_buff; // actually from AbstractBuffer::getMemoryPtr() via Chunk_NS::Chunk
size_t num_elems;
size_t row_id;
};

struct JoinColumn {
Expand Down
Loading

0 comments on commit 74c47ff

Please sign in to comment.