diff --git a/omniscidb/QueryEngine/CMakeLists.txt b/omniscidb/QueryEngine/CMakeLists.txt index 69d1bd61a..1c2a1e857 100644 --- a/omniscidb/QueryEngine/CMakeLists.txt +++ b/omniscidb/QueryEngine/CMakeLists.txt @@ -61,6 +61,7 @@ set(query_engine_source_files JoinHashTable/HashTable.cpp JoinHashTable/PerfectJoinHashTable.cpp JoinHashTable/Runtime/HashJoinRuntime.cpp + JoinHashTable/Runtime/HashJoinRuntimeCpu.cpp L0Kernel.cpp LogicalIR.cpp LLVMFunctionAttributesUtil.cpp diff --git a/omniscidb/QueryEngine/ColumnFetcher.cpp b/omniscidb/QueryEngine/ColumnFetcher.cpp index f95223edf..20a73868f 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.cpp +++ b/omniscidb/QueryEngine/ColumnFetcher.cpp @@ -146,8 +146,8 @@ JoinColumn ColumnFetcher::makeJoinColumn( data_provider, column_cache); if (col_buff != nullptr) { + join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count, num_elems}; num_elems += elem_count; - join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count}; } else { continue; } diff --git a/omniscidb/QueryEngine/JoinHashTable/Builders/PerfectHashTableBuilder.h b/omniscidb/QueryEngine/JoinHashTable/Builders/PerfectHashTableBuilder.h index a77f7abfc..5d880f4a6 100644 --- a/omniscidb/QueryEngine/JoinHashTable/Builders/PerfectHashTableBuilder.h +++ b/omniscidb/QueryEngine/JoinHashTable/Builders/PerfectHashTableBuilder.h @@ -166,8 +166,6 @@ class PerfectJoinHashTableBuilder { 0); auto cpu_hash_table_buff = reinterpret_cast(hash_table_->getCpuBuffer()); - const int thread_count = cpu_threads(); - std::vector init_cpu_buff_threads; { auto timer_init = DEBUG_TIMER("CPU One-To-One Perfect-Hash: init_hash_join_buff"); @@ -176,54 +174,36 @@ class PerfectJoinHashTableBuilder { hash_join_invalid_val); } const bool for_semi_join = for_semi_anti_join(join_type); - std::atomic err{0}; { auto timer_fill = - DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized"); - for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) { - init_cpu_buff_threads.emplace_back([hash_join_invalid_val, - &join_column, - str_proxy_translation_map, - thread_idx, - thread_count, - type, - &err, - &col_range, - &is_bitwise_eq, - &for_semi_join, - cpu_hash_table_buff, - hash_entry_info] { - int partial_err = fill_hash_join_buff_bucketized( - cpu_hash_table_buff, - hash_join_invalid_val, - for_semi_join, - join_column, - {static_cast(type->size()), - col_range.getIntMin(), - col_range.getIntMax(), - inline_fixed_encoding_null_value(type), - is_bitwise_eq, - col_range.getIntMax() + 1, - get_join_column_type_kind(type)}, - str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr, - str_proxy_translation_map ? str_proxy_translation_map->domainStart() - : 0, // 0 is dummy value - thread_idx, - thread_count, - hash_entry_info.bucket_normalization); - int zero{0}; - err.compare_exchange_strong(zero, partial_err); - }); - } - for (auto& t : init_cpu_buff_threads) { - t.join(); + DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized_cpu"); + + { + JoinColumnTypeInfo type_info{static_cast(type->size()), + col_range.getIntMin(), + col_range.getIntMax(), + inline_fixed_encoding_null_value(type), + is_bitwise_eq, + col_range.getIntMax() + 1, + get_join_column_type_kind(type)}; + + int error = fill_hash_join_buff_bucketized_cpu( + cpu_hash_table_buff, + hash_join_invalid_val, + for_semi_join, + join_column, + type_info, + str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr, + str_proxy_translation_map ? str_proxy_translation_map->domainStart() + : 0, // 0 is dummy value + hash_entry_info.bucket_normalization); + if (error) { + // Too many hash entries, need to retry with a 1:many table + hash_table_ = nullptr; // clear the hash table buffer + throw NeedsOneToManyHash(); + } } } - if (err) { - // Too many hash entries, need to retry with a 1:many table - hash_table_ = nullptr; // clear the hash table buffer - throw NeedsOneToManyHash(); - } } void initOneToManyHashTableOnCpu( diff --git a/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.cpp b/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.cpp index caf54bfa2..502330192 100644 --- a/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.cpp +++ b/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.cpp @@ -38,6 +38,7 @@ #endif #include +#include #include #endif @@ -73,35 +74,6 @@ namespace { * ignore any element ID that is not in the dictionary corresponding to t1_s.x or is * outside the range of column t1_s. */ -inline int64_t translate_str_id_to_outer_dict(const int64_t elem, - const int64_t min_elem, - const int64_t max_elem, - const void* sd_inner_proxy, - const void* sd_outer_proxy) { - CHECK(sd_outer_proxy); - const auto sd_inner_dict_proxy = - static_cast(sd_inner_proxy); - const auto sd_outer_dict_proxy = - static_cast(sd_outer_proxy); - const auto elem_str = sd_inner_dict_proxy->getString(elem); - const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str); - if (outer_id > max_elem || outer_id < min_elem) { - return StringDictionary::INVALID_STR_ID; - } - return outer_id; -} - -inline int64_t map_str_id_to_outer_dict(const int64_t inner_elem, - const int64_t min_inner_elem, - const int64_t min_outer_elem, - const int64_t max_outer_elem, - const int32_t* inner_to_outer_translation_map) { - const auto outer_id = inner_to_outer_translation_map[inner_elem - min_inner_elem]; - if (outer_id > max_outer_elem || outer_id < min_outer_elem) { - return StringDictionary::INVALID_STR_ID; - } - return outer_id; -} #if defined(_MSC_VER) #define DEFAULT_TARGET_ATTRIBUTE @@ -263,9 +235,11 @@ DEVICE auto fill_hash_join_buff_impl(int32_t* buff, #endif JoinColumnTyped col{&join_column, &type_info}; for (auto item : col.slice(start, step)) { + // LOG(ERROR) << "items: " << item.index; const size_t index = item.index; int64_t elem = item.element; if (elem == type_info.null_val) { + // LOG(ERROR) << "null val"; if (type_info.uses_bw_eq) { elem = type_info.translated_null_val; } else { @@ -275,11 +249,12 @@ DEVICE auto fill_hash_join_buff_impl(int32_t* buff, #ifndef __CUDACC__ if (sd_inner_to_outer_translation_map && (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) { - const auto outer_id = map_str_id_to_outer_dict(elem, - min_inner_elem, - type_info.min_val, - type_info.max_val, - sd_inner_to_outer_translation_map); + const auto outer_id = + cpu_utils::map_str_id_to_outer_dict(elem, + min_inner_elem, + type_info.min_val, + type_info.max_val, + sd_inner_to_outer_translation_map); if (outer_id == StringDictionary::INVALID_STR_ID) { continue; } @@ -678,11 +653,12 @@ DEVICE void count_matches_impl(int32_t* count_buff, #ifndef __CUDACC__ if (sd_inner_to_outer_translation_map && (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) { - const auto outer_id = map_str_id_to_outer_dict(elem, - min_inner_elem, - type_info.min_val, - type_info.max_val, - sd_inner_to_outer_translation_map); + const auto outer_id = + cpu_utils::map_str_id_to_outer_dict(elem, + min_inner_elem, + type_info.min_val, + type_info.max_val, + sd_inner_to_outer_translation_map); if (outer_id == StringDictionary::INVALID_STR_ID) { continue; } @@ -873,11 +849,12 @@ DEVICE void fill_row_ids_impl(int32_t* buff, #ifndef __CUDACC__ if (sd_inner_to_outer_translation_map && (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) { - const auto outer_id = map_str_id_to_outer_dict(elem, - min_inner_elem, - type_info.min_val, - type_info.max_val, - sd_inner_to_outer_translation_map); + const auto outer_id = + cpu_utils::map_str_id_to_outer_dict(elem, + min_inner_elem, + type_info.min_val, + type_info.max_val, + sd_inner_to_outer_translation_map); if (outer_id == StringDictionary::INVALID_STR_ID) { continue; } diff --git a/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.h b/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.h index 341158063..d43788f04 100644 --- a/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.h +++ b/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.h @@ -34,6 +34,7 @@ #include "../../DecodersImpl.h" #else #include "../../RuntimeFunctions.h" +#include "HashJoinRuntimeCpu.h" #endif #include "../../../QueryEngine/Compiler/CommonRuntimeDefs.h" #include "../../../Shared/funcannotations.h" @@ -66,7 +67,6 @@ void init_hash_join_buff(int32_t* buff, void init_hash_join_buff_on_device(int32_t* buff, const int64_t entry_count, const int32_t invalid_slot_val); - #ifndef __CUDACC__ void init_baseline_hash_join_buff_32(int8_t* hash_join_buff, @@ -101,6 +101,7 @@ struct JoinChunk { const int8_t* col_buff; // actually from AbstractBuffer::getMemoryPtr() via Chunk_NS::Chunk size_t num_elems; + size_t row_id; }; struct JoinColumn { diff --git a/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntimeCpu.cpp b/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntimeCpu.cpp new file mode 100644 index 000000000..4508e2bc8 --- /dev/null +++ b/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntimeCpu.cpp @@ -0,0 +1,267 @@ +#include + +#include "HashJoinRuntimeCpu.h" +// #include "QueryEngine/CompareKeysInl.h" + +namespace { + +template +inline int64_t getElem(const int8_t* chunk_mem_ptr, size_t elem_size, size_t elem_ind) { + UNREACHABLE(); + return 0; +}; + +template <> +inline int64_t getElem(const int8_t* chunk_mem_ptr, + size_t elem_size, + size_t elem_ind) { + return fixed_width_small_date_decode_noinline(chunk_mem_ptr, + elem_size, + elem_size == 4 ? NULL_INT : NULL_SMALLINT, + elem_size == 4 ? NULL_INT : NULL_SMALLINT, + elem_ind); +} + +template <> +inline int64_t getElem(const int8_t* chunk_mem_ptr, + size_t elem_size, + size_t elem_ind) { + return fixed_width_int_decode_noinline(chunk_mem_ptr, elem_size, elem_ind); +} + +template <> +inline int64_t getElem(const int8_t* chunk_mem_ptr, + size_t elem_size, + size_t elem_ind) { + return fixed_width_unsigned_decode_noinline(chunk_mem_ptr, elem_size, elem_ind); +} + +template <> +inline int64_t getElem(const int8_t* chunk_mem_ptr, + size_t elem_size, + size_t elem_ind) { + return fixed_width_double_decode_noinline(chunk_mem_ptr, elem_ind); +} + +template +inline int64_t getElem(const int8_t* chunk_mem_ptr, size_t elem_ind) { + return getElem(chunk_mem_ptr, Elem, elem_ind); +} + +template +inline int apply_hash_table_elementwise_impl( + const tbb::blocked_range& elems_range, + const int8_t* chunk_mem_ptr, + size_t curr_chunk_row_offset, + const JoinColumnTypeInfo& type_info, + const int32_t* sd_inner_to_outer_translation_map, + const int32_t min_inner_elem, + HASHTABLE_FILLING_FUNC hashtable_filling_func) { + // DEBUG_TIMER("fill_hash_join_buff_bucketized_cpu raw_func"); + // INJECT_TIMER(raw_func); + // LOG(ERROR) << " num_elems threaded: " << elems_range.size(); + for (size_t elem_i = elems_range.begin(); elem_i != elems_range.end(); elem_i++) { + int64_t elem = getElem(chunk_mem_ptr, elem_i); + + if (elem == type_info.null_val) { + if (!type_info.uses_bw_eq) { + continue; + } + elem = type_info.translated_null_val; + } + + if (sd_inner_to_outer_translation_map && + (!type_info.uses_bw_eq || elem != type_info.translated_null_val)) { + const auto outer_id = + cpu_utils::map_str_id_to_outer_dict(elem, + min_inner_elem, + type_info.min_val, + type_info.max_val, + sd_inner_to_outer_translation_map); + if (outer_id == StringDictionary::INVALID_STR_ID) { + continue; + } + elem = outer_id; + } + + if (hashtable_filling_func(elem, curr_chunk_row_offset + elem_i)) { + return -1; + } + } + return 0; +} + +template +inline int apply_hash_table_elementwise(const tbb::blocked_range& elems_range, + const int8_t* chunk_mem_ptr, + size_t curr_chunk_row_offset, + const JoinColumnTypeInfo& type_info, + const int32_t* sd_inner_to_outer_translation_map, + const int32_t min_inner_elem, + HASHTABLE_FILLING_FUNC hashtable_filling_func) { + switch (type_info.elem_sz) { + case 1: + return apply_hash_table_elementwise_impl( + elems_range, + chunk_mem_ptr, + curr_chunk_row_offset, + type_info, + sd_inner_to_outer_translation_map, + min_inner_elem, + hashtable_filling_func); + case 2: + return apply_hash_table_elementwise_impl( + elems_range, + chunk_mem_ptr, + curr_chunk_row_offset, + type_info, + sd_inner_to_outer_translation_map, + min_inner_elem, + hashtable_filling_func); + case 4: + return apply_hash_table_elementwise_impl( + elems_range, + chunk_mem_ptr, + curr_chunk_row_offset, + type_info, + sd_inner_to_outer_translation_map, + min_inner_elem, + hashtable_filling_func); + case 8: + return apply_hash_table_elementwise_impl( + elems_range, + chunk_mem_ptr, + curr_chunk_row_offset, + type_info, + sd_inner_to_outer_translation_map, + min_inner_elem, + hashtable_filling_func); + default: + break; + } + UNREACHABLE(); + return 0; +} + +template +inline int apply_hash_table_elementwise(const tbb::blocked_range& elems_range, + const int8_t* chunk_mem_ptr, + size_t curr_chunk_row_offset, + const JoinColumnTypeInfo& type_info, + const int32_t* sd_inner_to_outer_translation_map, + const int32_t min_inner_elem, + HASHTABLE_FILLING_FUNC hashtable_filling_func) { + switch (type_info.column_type) { + case SmallDate: + return apply_hash_table_elementwise( + elems_range, + chunk_mem_ptr, + curr_chunk_row_offset, + type_info, + sd_inner_to_outer_translation_map, + min_inner_elem, + hashtable_filling_func); + case Signed: + return apply_hash_table_elementwise( + elems_range, + chunk_mem_ptr, + curr_chunk_row_offset, + type_info, + sd_inner_to_outer_translation_map, + min_inner_elem, + hashtable_filling_func); + case Unsigned: + return apply_hash_table_elementwise( + elems_range, + chunk_mem_ptr, + curr_chunk_row_offset, + type_info, + sd_inner_to_outer_translation_map, + min_inner_elem, + hashtable_filling_func); + case Double: + return apply_hash_table_elementwise( + elems_range, + chunk_mem_ptr, + curr_chunk_row_offset, + type_info, + sd_inner_to_outer_translation_map, + min_inner_elem, + hashtable_filling_func); + default: + break; + } + UNREACHABLE(); + return 0; +} + +} // namespace + +DEVICE int SUFFIX(fill_hash_join_buff_bucketized_cpu)( + int32_t* cpu_hash_table_buff, + const int32_t hash_join_invalid_val, + const bool for_semi_join, + const JoinColumn& join_column, + const JoinColumnTypeInfo& type_info, + const int32_t* sd_inner_to_outer_translation_map, + const int32_t min_inner_elem, + const int64_t bucket_normalization) { + auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join) + : SUFFIX(fill_one_to_one_hashtable); + auto hashtable_filling_func = [&](int64_t elem, size_t index) { + auto entry_ptr = SUFFIX(get_bucketized_hash_slot)( + cpu_hash_table_buff, elem, type_info.min_val, bucket_normalization); + return filling_func(index, entry_ptr, hash_join_invalid_val); + }; + + // for some reason int8* ptr is actually JoinChunk* Why? + auto join_chunk_array = + reinterpret_cast(join_column.col_chunks_buff); + // BTW it's vector with sz: + // join_column.num_chunks + // const int8_t* chunk_mem_ptr = join_chunk_array->col_buff; + + // It's possible that 1 chunk, but 0 elements. + if (join_column.num_elems == 0) { + return 0; + } + + // This value is tuned to make range of elemnts + // handled in each thread spend about 10ms according to timers. + size_t data_to_handle_sz = 512 * 1024; + size_t granularity = data_to_handle_sz / type_info.elem_sz; + + std::atomic err{0}; + // LOG(ERROR) << "Num chunks: " << join_column.num_chunks; + tbb::parallel_for( + tbb::blocked_range(0, join_column.num_chunks), + [&](const tbb::blocked_range& join_chunks_range) { + DEBUG_TIMER("fill_hash_join_buff_bucketized_cpu chunk"); + for (size_t chunk_i = join_chunks_range.begin(); + chunk_i != join_chunks_range.end(); + chunk_i++) { + auto curr_chunk = join_chunk_array[chunk_i]; + // LOG(ERROR) << " num elems: " << curr_chunk.num_elems; + + tbb::parallel_for( + tbb::blocked_range(0, curr_chunk.num_elems, granularity), + [&](const tbb::blocked_range& curr_chnunk_elems_range) { + auto ret = apply_hash_table_elementwise(curr_chnunk_elems_range, + curr_chunk.col_buff, + curr_chunk.row_id, + type_info, + sd_inner_to_outer_translation_map, + min_inner_elem, + hashtable_filling_func); + if (ret != 0) { + int zero{0}; + err.compare_exchange_strong(zero, ret); + } + }); + } + }); + if (err) { + return -1; + } + return 0; +} diff --git a/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntimeCpu.h b/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntimeCpu.h new file mode 100644 index 000000000..aa37062f9 --- /dev/null +++ b/omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntimeCpu.h @@ -0,0 +1,71 @@ +#pragma once + +#include +#include + +#include "../../../Shared/sqltypes.h" + +namespace cpu_utils { +/** + * Joins between two dictionary encoded string columns without a shared string dictionary + * are computed by translating the inner dictionary to the outer dictionary while filling + * the hash table. The translation works as follows: + * + * Given two tables t1 and t2, with t1 the outer table and t2 the inner table, and two + * columns t1.x and t2.x, both dictionary encoded strings without a shared dictionary, we + * read each value in t2.x and do a lookup in the dictionary for t1.x. If the lookup + * returns a valid ID, we insert that ID into the hash table. Otherwise, we skip adding an + * entry into the hash table for the inner column. We can also skip adding any entries + * that are outside the range of the outer column. + * + * Consider a join of the form SELECT x, n FROM (SELECT x, COUNT(*) n FROM t1 GROUP BY x + * HAVING n > 10), t2 WHERE t1.x = t2.x; Let the result of the subquery be t1_s. + * Due to the HAVING clause, the range of all IDs in t1_s must be less than or equal to + * the range of all IDs in t1. Suppose we have an element a in t2.x that is also in + * t1_s.x. Then the ID of a must be within the range of t1_s. Therefore it is safe to + * ignore any element ID that is not in the dictionary corresponding to t1_s.x or is + * outside the range of column t1_s. + */ + +inline int64_t translate_str_id_to_outer_dict(const int64_t elem, + const int64_t min_elem, + const int64_t max_elem, + const void* sd_inner_proxy, + const void* sd_outer_proxy) { + CHECK(sd_outer_proxy); + const auto sd_inner_dict_proxy = + static_cast(sd_inner_proxy); + const auto sd_outer_dict_proxy = + static_cast(sd_outer_proxy); + const auto elem_str = sd_inner_dict_proxy->getString(elem); + const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str); + if (outer_id > max_elem || outer_id < min_elem) { + return StringDictionary::INVALID_STR_ID; + } + return outer_id; +}; + +inline int64_t map_str_id_to_outer_dict(const int64_t inner_elem, + const int64_t min_inner_elem, + const int64_t min_outer_elem, + const int64_t max_outer_elem, + const int32_t* inner_to_outer_translation_map) { + const auto outer_id = inner_to_outer_translation_map[inner_elem - min_inner_elem]; + if (outer_id > max_outer_elem || outer_id < min_outer_elem) { + return StringDictionary::INVALID_STR_ID; + } + return outer_id; +} +} // namespace cpu_utils + +struct JoinColumn; +struct JoinColumnTypeInfo; + +int fill_hash_join_buff_bucketized_cpu(int32_t* cpu_hash_table_buff, + const int32_t hash_join_invalid_val, + const bool for_semi_join, + const JoinColumn& join_column, + const JoinColumnTypeInfo& type_info, + const int32_t* sd_inner_to_outer_translation_map, + const int32_t min_inner_elem, + const int64_t bucket_normalization);