Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
[Join] InitHashTable optimisation
Browse files Browse the repository at this point in the history
This commit reworks `fill_hash_join_buff_bucketized_cpu` to use tbb and
utilize cpu properly.

Partially resolves: #574

Signed-off-by: Dmitrii Makarenko <[email protected]>
  • Loading branch information
Devjiu committed Sep 18, 2023
1 parent 585911b commit 3c9ee01
Show file tree
Hide file tree
Showing 7 changed files with 373 additions and 97 deletions.
1 change: 1 addition & 0 deletions omniscidb/QueryEngine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ set(query_engine_source_files
JoinHashTable/HashTable.cpp
JoinHashTable/PerfectJoinHashTable.cpp
JoinHashTable/Runtime/HashJoinRuntime.cpp
JoinHashTable/Runtime/HashJoinRuntimeCpu.cpp
L0Kernel.cpp
LogicalIR.cpp
LLVMFunctionAttributesUtil.cpp
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ JoinColumn ColumnFetcher::makeJoinColumn(
data_provider,
column_cache);
if (col_buff != nullptr) {
join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count, num_elems};
num_elems += elem_count;
join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
} else {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include "QueryEngine/JoinHashTable/PerfectHashTable.h"
#include "QueryEngine/JoinHashTable/Runtime/HashJoinRuntimeCpu.h"

#include "Shared/scope.h"

Expand Down Expand Up @@ -166,8 +167,6 @@ class PerfectJoinHashTableBuilder {
0);

auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
const int thread_count = cpu_threads();
std::vector<std::thread> init_cpu_buff_threads;

{
auto timer_init = DEBUG_TIMER("CPU One-To-One Perfect-Hash: init_hash_join_buff");
Expand All @@ -176,54 +175,36 @@ class PerfectJoinHashTableBuilder {
hash_join_invalid_val);
}
const bool for_semi_join = for_semi_anti_join(join_type);
std::atomic<int> err{0};
{
auto timer_fill =
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized");
for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
&join_column,
str_proxy_translation_map,
thread_idx,
thread_count,
type,
&err,
&col_range,
&is_bitwise_eq,
&for_semi_join,
cpu_hash_table_buff,
hash_entry_info] {
int partial_err = fill_hash_join_buff_bucketized(
cpu_hash_table_buff,
hash_join_invalid_val,
for_semi_join,
join_column,
{static_cast<size_t>(type->size()),
col_range.getIntMin(),
col_range.getIntMax(),
inline_fixed_encoding_null_value(type),
is_bitwise_eq,
col_range.getIntMax() + 1,
get_join_column_type_kind(type)},
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
: 0, // 0 is dummy value
thread_idx,
thread_count,
hash_entry_info.bucket_normalization);
int zero{0};
err.compare_exchange_strong(zero, partial_err);
});
}
for (auto& t : init_cpu_buff_threads) {
t.join();
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized_cpu");

{
JoinColumnTypeInfo type_info{static_cast<size_t>(type->size()),
col_range.getIntMin(),
col_range.getIntMax(),
inline_fixed_encoding_null_value(type),
is_bitwise_eq,
col_range.getIntMax() + 1,
get_join_column_type_kind(type)};

int error = fill_hash_join_buff_bucketized_cpu(
cpu_hash_table_buff,
hash_join_invalid_val,
for_semi_join,
join_column,
type_info,
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
: 0, // 0 is dummy value
hash_entry_info.bucket_normalization);
if (error) {
// Too many hash entries, need to retry with a 1:many table
hash_table_ = nullptr; // clear the hash table buffer
throw NeedsOneToManyHash();
}
}
}
if (err) {
// Too many hash entries, need to retry with a 1:many table
hash_table_ = nullptr; // clear the hash table buffer
throw NeedsOneToManyHash();
}
}

void initOneToManyHashTableOnCpu(
Expand Down
51 changes: 1 addition & 50 deletions omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#else
#include "Logger/Logger.h"

#include "HashJoinRuntimeCpu.h"
#include "QueryEngine/RuntimeFunctions.h"
#include "Shared/likely.h"
#include "StringDictionary/StringDictionary.h"
Expand All @@ -53,56 +54,6 @@
#ifndef __CUDACC__
namespace {

/**
* Joins between two dictionary encoded string columns without a shared string dictionary
* are computed by translating the inner dictionary to the outer dictionary while filling
* the hash table. The translation works as follows:
*
* Given two tables t1 and t2, with t1 the outer table and t2 the inner table, and two
* columns t1.x and t2.x, both dictionary encoded strings without a shared dictionary, we
* read each value in t2.x and do a lookup in the dictionary for t1.x. If the lookup
* returns a valid ID, we insert that ID into the hash table. Otherwise, we skip adding an
* entry into the hash table for the inner column. We can also skip adding any entries
* that are outside the range of the outer column.
*
* Consider a join of the form SELECT x, n FROM (SELECT x, COUNT(*) n FROM t1 GROUP BY x
* HAVING n > 10), t2 WHERE t1.x = t2.x; Let the result of the subquery be t1_s.
* Due to the HAVING clause, the range of all IDs in t1_s must be less than or equal to
* the range of all IDs in t1. Suppose we have an element a in t2.x that is also in
* t1_s.x. Then the ID of a must be within the range of t1_s. Therefore it is safe to
* ignore any element ID that is not in the dictionary corresponding to t1_s.x or is
* outside the range of column t1_s.
*/
inline int64_t translate_str_id_to_outer_dict(const int64_t elem,
const int64_t min_elem,
const int64_t max_elem,
const void* sd_inner_proxy,
const void* sd_outer_proxy) {
CHECK(sd_outer_proxy);
const auto sd_inner_dict_proxy =
static_cast<const StringDictionaryProxy*>(sd_inner_proxy);
const auto sd_outer_dict_proxy =
static_cast<const StringDictionaryProxy*>(sd_outer_proxy);
const auto elem_str = sd_inner_dict_proxy->getString(elem);
const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str);
if (outer_id > max_elem || outer_id < min_elem) {
return StringDictionary::INVALID_STR_ID;
}
return outer_id;
}

inline int64_t map_str_id_to_outer_dict(const int64_t inner_elem,
const int64_t min_inner_elem,
const int64_t min_outer_elem,
const int64_t max_outer_elem,
const int32_t* inner_to_outer_translation_map) {
const auto outer_id = inner_to_outer_translation_map[inner_elem - min_inner_elem];
if (outer_id > max_outer_elem || outer_id < min_outer_elem) {
return StringDictionary::INVALID_STR_ID;
}
return outer_id;
}

#if defined(_MSC_VER)
#define DEFAULT_TARGET_ATTRIBUTE
#else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ struct JoinChunk {
const int8_t*
col_buff; // actually from AbstractBuffer::getMemoryPtr() via Chunk_NS::Chunk
size_t num_elems;
size_t row_id;
};

struct JoinColumn {
Expand Down
Loading

0 comments on commit 3c9ee01

Please sign in to comment.