diff --git a/omniscidb/QueryEngine/ColumnFetcher.cpp b/omniscidb/QueryEngine/ColumnFetcher.cpp index 20a73868f..1514d2dbd 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.cpp +++ b/omniscidb/QueryEngine/ColumnFetcher.cpp @@ -16,8 +16,6 @@ #include "QueryEngine/ColumnFetcher.h" -#include - #include "DataMgr/ArrayNoneEncoder.h" #include "QueryEngine/ErrorHandling.h" #include "QueryEngine/Execute.h" @@ -25,6 +23,9 @@ #include "Shared/likely.h" #include "Shared/sqltypes.h" +#include +#include + namespace { std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel) { @@ -239,8 +240,10 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( int db_id = col_info->db_id; int table_id = col_info->table_id; int col_id = col_info->column_id; + const auto fragments_it = all_tables_fragments.find({db_id, table_id}); CHECK(fragments_it != all_tables_fragments.end()); + const auto fragments = fragments_it->second; const auto frag_count = fragments->size(); std::vector> column_frags; @@ -248,7 +251,6 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( const InputDescriptor table_desc(db_id, table_id, int(0)); { std::lock_guard columnar_conversion_guard(columnar_fetch_mutex_); - auto col_token = data_provider_->getZeroCopyColumnData(*col_info); if (col_token != nullptr) { size_t num_rows = col_token->getSize() / col_token->getType()->size(); @@ -262,44 +264,111 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( } auto column_it = columnarized_scan_table_cache_.find({table_id, col_id}); - if (column_it == columnarized_scan_table_cache_.end()) { - for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { - if (executor_->getConfig() - .exec.interrupt.enable_non_kernel_time_query_interrupt && - executor_->checkNonKernelTimeInterrupted()) { - throw QueryExecutionError(Executor::ERR_INTERRUPTED); - } - std::list> chunk_holder; - std::list chunk_iter_holder; - const auto& fragment = (*fragments)[frag_id]; - if (fragment.isEmptyPhysicalFragment()) { - continue; - } - auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id); - CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); - auto col_buffer = getOneTableColumnFragment(col_info, - static_cast(frag_id), - all_tables_fragments, - chunk_holder, - chunk_iter_holder, - Data_Namespace::CPU_LEVEL, - int(0), - device_allocator); - column_frags.push_back( - std::make_unique(executor_->row_set_mem_owner_, - col_buffer, - fragment.getNumTuples(), - chunk_meta_it->second->type(), - thread_idx)); + if (column_it != columnarized_scan_table_cache_.end()) { + table_column = column_it->second.get(); + return ColumnFetcher::transferColumnIfNeeded( + table_column, 0, memory_level, device_id, device_allocator); + } + + size_t total_row_count = 0; + for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { + if (executor_->getConfig().exec.interrupt.enable_non_kernel_time_query_interrupt && + executor_->checkNonKernelTimeInterrupted()) { + throw QueryExecutionError(Executor::ERR_INTERRUPTED); } - auto merged_results = - ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags); + const auto& fragment = (*fragments)[frag_id]; + const auto& rows_in_frag = fragment.getNumTuples(); + total_row_count += rows_in_frag; + } + + const auto& type_width = col_info->type->size(); + auto write_ptr = + executor_->row_set_mem_owner_->allocate(type_width * total_row_count); + std::vector> write_ptrs; + std::vector valid_fragments; + for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { + const auto& fragment = (*fragments)[frag_id]; + if (fragment.isEmptyPhysicalFragment()) { + continue; + } + CHECK_EQ(type_width, fragment.getChunkMetadataMap().at(col_id)->type()->size()); + write_ptrs.push_back({write_ptr, fragment.getNumTuples() * type_width}); + write_ptr += fragment.getNumTuples() * type_width; + valid_fragments.push_back(frag_id); + } + + if (write_ptrs.empty()) { + std::unique_ptr merged_results(nullptr); + table_column = merged_results.get(); columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id), std::move(merged_results)); - } else { - table_column = column_it->second.get(); + + return ColumnFetcher::transferColumnIfNeeded( + table_column, 0, memory_level, device_id, device_allocator); } + + size_t valid_frag_count = valid_fragments.size(); + tbb::parallel_for( + tbb::blocked_range(0, valid_frag_count), + [&](const tbb::blocked_range& frag_ids) { + for (size_t v_frag_id = frag_ids.begin(); v_frag_id < frag_ids.end(); + ++v_frag_id) { + std::list> chunk_holder; + std::list chunk_iter_holder; + size_t frag_id = valid_fragments[v_frag_id]; + const auto& fragment = (*fragments)[frag_id]; + auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id); + CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); + std::shared_ptr chunk; + // Fixed length arrays are also included here. + const bool is_varlen = + col_info->type->isString() || col_info->type->isArray(); + int8_t* col_buffer; + { + ChunkKey chunk_key{ + db_id, fragment.physicalTableId, col_id, fragment.fragmentId}; + std::unique_ptr> varlen_chunk_lock; + if (is_varlen) { + varlen_chunk_lock.reset( + new std::lock_guard(varlen_chunk_fetch_mutex_)); + } + chunk = data_provider_->getChunk(col_info, + chunk_key, + Data_Namespace::CPU_LEVEL, + 0, + chunk_meta_it->second->numBytes(), + chunk_meta_it->second->numElements()); + std::lock_guard chunk_list_lock(chunk_list_mutex_); + chunk_holder.push_back(chunk); + } + if (is_varlen) { + CHECK_GT(table_id, 0); + CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); + chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second)); + auto& chunk_iter = chunk_iter_holder.back(); + col_buffer = reinterpret_cast(&chunk_iter); + } else { + auto ab = chunk->getBuffer(); + CHECK(ab->getMemoryPtr()); + col_buffer = ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter + } + memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second); + } + }); + + std::vector raw_write_ptrs; + raw_write_ptrs.reserve(frag_count); + for (size_t i = 0; i < frag_count; i++) { + raw_write_ptrs.emplace_back(write_ptrs[i].first); + } + + std::unique_ptr merged_results(new ColumnarResults( + std::move(raw_write_ptrs), total_row_count, col_info->type, thread_idx)); + + table_column = merged_results.get(); + columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id), + std::move(merged_results)); } return ColumnFetcher::transferColumnIfNeeded( table_column, 0, memory_level, device_id, device_allocator); diff --git a/omniscidb/QueryEngine/JoinHashTable/Runtime/JoinColumnIterator.h b/omniscidb/QueryEngine/JoinHashTable/Runtime/JoinColumnIterator.h index 063c7d3ff..d3027127b 100644 --- a/omniscidb/QueryEngine/JoinHashTable/Runtime/JoinColumnIterator.h +++ b/omniscidb/QueryEngine/JoinHashTable/Runtime/JoinColumnIterator.h @@ -73,6 +73,7 @@ struct JoinColumnIterator { DEVICE FORCE_INLINE JoinColumnIterator& operator++() { index += step; index_inside_chunk += step; + // this loop is made to find index_of_chunk by total index of element while (chunk_data && index_inside_chunk >= join_chunk_array[index_of_chunk].num_elems) { index_inside_chunk -= join_chunk_array[index_of_chunk].num_elems; diff --git a/omniscidb/ResultSetRegistry/ColumnarResults.cpp b/omniscidb/ResultSetRegistry/ColumnarResults.cpp index d562aa186..a22924e85 100644 --- a/omniscidb/ResultSetRegistry/ColumnarResults.cpp +++ b/omniscidb/ResultSetRegistry/ColumnarResults.cpp @@ -122,28 +122,6 @@ ColumnarResults::ColumnarResults(std::shared_ptr row_set_mem_ } } -ColumnarResults::ColumnarResults(std::shared_ptr row_set_mem_owner, - const int8_t* one_col_buffer, - const size_t num_rows, - const hdk::ir::Type* target_type, - const size_t thread_idx) - : column_buffers_(1) - , num_rows_(num_rows) - , target_types_{target_type} - , parallel_conversion_(false) - , direct_columnar_conversion_(false) - , thread_idx_(thread_idx) { - auto timer = DEBUG_TIMER(__func__); - - if (target_type->isVarLen()) { - throw ColumnarConversionNotSupported(); - } - const auto buf_size = num_rows * target_type->size(); - column_buffers_[0] = - reinterpret_cast(row_set_mem_owner->allocate(buf_size, thread_idx_)); - memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size); -} - ColumnarResults::ColumnarResults(const std::vector one_col_buffer, const size_t num_rows, const hdk::ir::Type* target_type, diff --git a/omniscidb/ResultSetRegistry/ColumnarResults.h b/omniscidb/ResultSetRegistry/ColumnarResults.h index ced7ac30e..91eb97dc8 100644 --- a/omniscidb/ResultSetRegistry/ColumnarResults.h +++ b/omniscidb/ResultSetRegistry/ColumnarResults.h @@ -67,12 +67,6 @@ class ColumnarResults { const Config& config, const bool is_parallel_execution_enforced = false); - ColumnarResults(const std::shared_ptr row_set_mem_owner, - const int8_t* one_col_buffer, - const size_t num_rows, - const hdk::ir::Type* target_type, - const size_t thread_idx); - ColumnarResults(const std::vector one_col_buffer, const size_t num_rows, const hdk::ir::Type* target_type, diff --git a/python/pyhdk/hdk.py b/python/pyhdk/hdk.py index 005f6c386..98f3dc476 100644 --- a/python/pyhdk/hdk.py +++ b/python/pyhdk/hdk.py @@ -2899,6 +2899,9 @@ def if_then_else(self, cond, true_val, false_val): """ return self._builder.if_then_else(cond, true_val, false_val) + def clear_cache(self): + self._executor.clearMemory(self._data_mgr, 1) + def init(**kwargs): if init._instance is None: