Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
s
Browse files Browse the repository at this point in the history
  • Loading branch information
Devjiu committed Jul 11, 2023
1 parent b0bc82a commit 923791c
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 14 deletions.
13 changes: 12 additions & 1 deletion omniscidb/ArrowStorage/ArrowStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ void ArrowStorage::fetchBuffer(const ChunkKey& key,
const size_t num_bytes) {
auto timer = DEBUG_TIMER("ArrowStorage Fetch Buffer");
mapd_shared_lock<mapd_shared_mutex> data_lock(data_mutex_);
auto timer2 = DEBUG_TIMER("ArrowStorage Data Lock taken");
CHECK_EQ(key[CHUNK_KEY_DB_IDX], db_id_);
CHECK_EQ(tables_.count(key[CHUNK_KEY_TABLE_IDX]), (size_t)1);
auto& table = *tables_.at(key[CHUNK_KEY_TABLE_IDX]);
mapd_shared_lock<mapd_shared_mutex> table_lock(table.mutex);
auto timer3 = DEBUG_TIMER("ArrowStorage Table Lock taken");
data_lock.unlock();

size_t col_idx = columnIndex(key[CHUNK_KEY_COLUMN_IDX]);
Expand All @@ -108,17 +110,25 @@ void ArrowStorage::fetchBuffer(const ChunkKey& key,
if (!col_type->isVarLen()) {
CHECK_EQ(key.size(), (size_t)4);
size_t elem_size = col_type->size();
fetchFixedLenData(table, frag_idx, col_idx, dest, num_bytes, elem_size);
if (col_type->isString()) {
auto timer4 = DEBUG_TIMER("ArrowStorage Str Fix len Fetch");
fetchFixedLenData(table, frag_idx, col_idx, dest, num_bytes, elem_size);
} else {
auto timer4 = DEBUG_TIMER("ArrowStorage NotStr Fix len Fetch");
fetchFixedLenData(table, frag_idx, col_idx, dest, num_bytes, elem_size);
}
} else {
CHECK_EQ(key.size(), (size_t)5);
if (key[CHUNK_KEY_VARLEN_IDX] == 1) {
if (!dest->hasEncoder()) {
dest->initEncoder(col_type);
}
if (col_type->isString()) {
auto timer4 = DEBUG_TIMER("ArrowStorage Str Var len Fetch");
fetchVarLenData(table, frag_idx, col_idx, dest, num_bytes);
} else {
CHECK(col_type->isVarLenArray());
auto timer4 = DEBUG_TIMER("ArrowStorage NotStr Var len Fetch");
fetchVarLenArrayData(table,
frag_idx,
col_idx,
Expand All @@ -128,6 +138,7 @@ void ArrowStorage::fetchBuffer(const ChunkKey& key,
}
} else {
CHECK_EQ(key[CHUNK_KEY_VARLEN_IDX], 2);
auto timer4 = DEBUG_TIMER("ArrowStorage Off Var len Fetch");
fetchVarLenOffsets(table, frag_idx, col_idx, dest, num_bytes);
}
}
Expand Down
5 changes: 3 additions & 2 deletions omniscidb/DataMgr/Chunk/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ std::shared_ptr<Chunk> Chunk::getChunk(ColumnInfoPtr col_info,
std::shared_ptr<Chunk> chunkp = std::make_shared<Chunk>(Chunk(col_info));
chunkp->getChunkBuffer(data_mgr, key, memoryLevel, deviceId, numBytes, numElems);
{
auto timer2 = DEBUG_TIMER("ChunkNS getChunk ");
LOG(WARNING) << "[Debug Timer] " << timer2 << " col_info: " << col_info->toString();
// auto timer2 = DEBUG_TIMER("ChunkNS getChunk ");
timer.stop();
LOG(WARNING) << "[Debug Timer] col_info: " << col_info->toString();
}
return chunkp;
}
Expand Down
36 changes: 27 additions & 9 deletions omniscidb/QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "QueryEngine/ColumnFetcher.h"

#include <tbb/parallel_for.h>
#include <memory>

#include "DataMgr/ArrayNoneEncoder.h"
Expand Down Expand Up @@ -172,7 +173,8 @@ const int8_t* ColumnFetcher::getOneTableColumnFragment(
std::list<ChunkIter>& chunk_iter_holder,
const Data_Namespace::MemoryLevel memory_level,
const int device_id,
DeviceAllocator* allocator) const {
DeviceAllocator* allocator,
const size_t thread_idx) const {
auto timer = DEBUG_TIMER(__func__);
INJECT_TIMER(getOneTableColumnFragment);
int db_id = col_info->db_id;
Expand All @@ -186,6 +188,10 @@ const int8_t* ColumnFetcher::getOneTableColumnFragment(
if (fragment.isEmptyPhysicalFragment()) {
return nullptr;
}
LOG(ERROR) << __func__ << " lock taken, execution started parent thread: " << thread_idx
<< "\n logger curr tid: " << logger::thread_id()
<< "\n scan_table tabid: " << table_id << " colid: " << col_id
<< "\n col_info: " << col_info->toString();
std::shared_ptr<Chunk_NS::Chunk> chunk;
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
Expand Down Expand Up @@ -254,11 +260,17 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
const auto fragments = fragments_it->second;
const auto frag_count = fragments->size();
std::vector<std::unique_ptr<ColumnarResults>> column_frags;
std::vector<ColumnarDataRefence> column_frags_raw;

const ColumnarResults* table_column = nullptr;
const InputDescriptor table_desc(db_id, table_id, int(0));
{
std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
auto timer = DEBUG_TIMER("lock taken, execution started");
LOG(ERROR) << __func__
<< " lock taken, execution started parent thread: " << thread_idx
<< "\n logger curr tid: " << logger::thread_id()
<< "\n scan_table tabid: " << table_id << " colid: " << col_id;
auto column_it = columnarized_scan_table_cache_.find({table_id, col_id});
if (column_it == columnarized_scan_table_cache_.end()) {
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
Expand All @@ -282,22 +294,28 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
chunk_iter_holder,
Data_Namespace::CPU_LEVEL,
int(0),
device_allocator);
column_frags.push_back(
std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
col_buffer,
fragment.getNumTuples(),
chunk_meta_it->second->type(),
thread_idx));
device_allocator,
thread_idx);
column_frags_raw.push_back(
{col_buffer, fragment.getNumTuples(), chunk_meta_it->second->type()});
// column_frags.push_back(
// std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
// col_buffer,
// fragment.getNumTuples(),
// chunk_meta_it->second->type(),
// thread_idx));
}
// auto merged_results =
// ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
auto merged_results =
ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags_raw);
table_column = merged_results.get();
columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id),
std::move(merged_results));
} else {
table_column = column_it->second.get();
}
timer.stop();
}
return ColumnFetcher::transferColumnIfNeeded(
table_column, 0, memory_level, device_id, device_allocator);
Expand Down
3 changes: 2 additions & 1 deletion omniscidb/QueryEngine/ColumnFetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class ColumnFetcher {
std::list<ChunkIter>& chunk_iter_holder,
const Data_Namespace::MemoryLevel memory_level,
const int device_id,
DeviceAllocator* device_allocator) const;
DeviceAllocator* device_allocator,
const size_t thread_idx = 0) const;

const int8_t* getAllTableColumnFragments(
ColumnInfoPtr col_info,
Expand Down
1 change: 0 additions & 1 deletion omniscidb/QueryEngine/Execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2892,7 +2892,6 @@ void Executor::launchKernels(SharedKernelContext& shared_context,
&shared_context,
parent_thread_id = logger::thread_id(),
crt_kernel_idx = kernel_idx++] {
DEBUG_TIMER_NEW_THREAD(parent_thread_id);
const size_t thread_i = crt_kernel_idx % cpu_threads();
kernel->run(this, thread_i, shared_context);
});
Expand Down
74 changes: 74 additions & 0 deletions omniscidb/ResultSetRegistry/ColumnarResults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,32 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
const auto buf_size = num_rows * target_type->size();
column_buffers_[0] =
reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));

// HERE!!!
LOG(ERROR) << "HERE!!!";
memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
}

ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const int8_t* one_col_buffer,
const size_t num_rows,
const hdk::ir::Type* target_type,
const size_t thread_idx,
bool just_set)
: column_buffers_(1)
, num_rows_(num_rows)
, target_types_{target_type}
, parallel_conversion_(false)
, direct_columnar_conversion_(false)
, thread_idx_(thread_idx) {
auto timer = DEBUG_TIMER(__func__);
const bool is_varlen = target_type->isArray() || target_type->isString();
if (is_varlen) {
throw ColumnarConversionNotSupported();
}
column_buffers_[0] = const_cast<int8_t*>(one_col_buffer);
}

std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
Expand All @@ -159,6 +182,7 @@ std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
if (nonempty_it == sub_results.end()) {
return nullptr;
}
LOG(ERROR) << "col_count: " << col_count;
for (size_t col_idx = 0; col_idx < col_count; ++col_idx) {
const auto byte_width = (*nonempty_it)->columnType(col_idx)->size();
auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
Expand All @@ -169,13 +193,63 @@ std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
continue;
}
CHECK_EQ(byte_width, rs->columnType(col_idx)->size());
// HERE!!!
LOG(ERROR) << "HERE!!!";
memcpy(write_ptr, rs->column_buffers_[col_idx], rs->size() * byte_width);
write_ptr += rs->size() * byte_width;
}
}
return merged_results;
}

std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const std::vector<ColumnarDataRefence>& sub_results) {
auto timer = DEBUG_TIMER(__func__);
INJECT_TIMER(mergeResults);

if (sub_results.empty()) {
return nullptr;
}
const auto total_row_count =
std::accumulate(sub_results.begin(),
sub_results.end(),
size_t(0),
[](const size_t init, const ColumnarDataRefence& result) {
return init + result.num_rows;
});

// std::unique_ptr<ColumnarResults> merged_results(
// new ColumnarResults(row_set_mem_owner, total_row_count,
// sub_results[0].target_type));
const auto nonempty_it = std::find_if(
sub_results.begin(), sub_results.end(), [](const ColumnarDataRefence& needle) {
return needle.num_rows;
});
if (nonempty_it == sub_results.end()) {
return nullptr;
}
const auto byte_width = (*nonempty_it).target_type->size();
auto write_ptr = row_set_mem_owner->allocate(byte_width * total_row_count);
// merged_results->column_buffers_.push_back(write_ptr);
for (auto& rs : sub_results) {
if (!rs.num_rows) {
continue;
}
CHECK_EQ(byte_width, rs.target_type->size());
// HERE!!!
LOG(ERROR) << "HERE!!!";
memcpy(write_ptr, rs.one_col_buffer, rs.num_rows * byte_width);
write_ptr += rs.num_rows * byte_width;
}
return std::unique_ptr<ColumnarResults>(new ColumnarResults(row_set_mem_owner,
write_ptr,
total_row_count,
(*nonempty_it).target_type,
0,
false));
}

void ColumnarResults::materializeAllGroupbyColumnsThroughIteration(
const ResultSet& rows,
const size_t num_columns) {
Expand Down
16 changes: 16 additions & 0 deletions omniscidb/ResultSetRegistry/ColumnarResults.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class ColumnBitmap {
std::vector<std::vector<bool>> bitmaps_;
};

struct ColumnarDataRefence {
const int8_t* one_col_buffer;
const size_t num_rows;
const hdk::ir::Type* target_type;
};

class ColumnarResults {
public:
ColumnarResults(const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
Expand All @@ -77,6 +83,10 @@ class ColumnarResults {
const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const std::vector<std::unique_ptr<ColumnarResults>>& sub_results);

static std::unique_ptr<ColumnarResults> mergeResults(
std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const std::vector<ColumnarDataRefence>& sub_results);

const std::vector<int8_t*>& getColumnBuffers() const { return column_buffers_; }
const std::vector<int8_t*>& getOffsetBuffers() const { return offset_buffers_; }

Expand Down Expand Up @@ -115,6 +125,12 @@ class ColumnarResults {
ColumnarResults(const size_t num_rows,
const std::vector<const hdk::ir::Type*>& target_types)
: num_rows_(num_rows), target_types_(target_types) {}
ColumnarResults(const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const int8_t* one_col_buffer,
const size_t num_rows,
const hdk::ir::Type* target_type,
const size_t thread_idx,
bool just_set);
inline void writeBackCell(const TargetValue& col_val,
const size_t row_idx,
const size_t column_idx);
Expand Down

0 comments on commit 923791c

Please sign in to comment.