Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
[Join] Zero-copy storage column check.
Browse files Browse the repository at this point in the history
This commit adds check for valid pointer to column buffer to skip
copying. Most effective with enabled `enable-non-lazy-data-import`
option. Checks number of chunks in storage.

Partially resolves: #574

Signed-off-by: Dmitrii Makarenko <[email protected]>
  • Loading branch information
Devjiu authored and ienkovich committed Aug 23, 2023
1 parent b55f9df commit 3ac48fe
Show file tree
Hide file tree
Showing 18 changed files with 157 additions and 18 deletions.
45 changes: 45 additions & 0 deletions omniscidb/ArrowStorage/ArrowStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,51 @@ std::unique_ptr<AbstractDataToken> ArrowStorage::getZeroCopyBufferMemory(
return nullptr;
}

std::unique_ptr<AbstractDataToken> ArrowStorage::getZeroCopyColumnData(
const ColumnRef& col_ref) {
mapd_shared_lock<mapd_shared_mutex> data_lock(data_mutex_);
CHECK_EQ(col_ref.db_id, db_id_);
CHECK_EQ(tables_.count(col_ref.table_id), (size_t)1);
auto& table = *tables_.at(col_ref.table_id);
mapd_shared_lock<mapd_shared_mutex> table_lock(table.mutex);
data_lock.unlock();

auto col_type = getColumnInfo(col_ref.db_id, col_ref.table_id, col_ref.column_id)->type;

if (col_type->isExtDictionary()) {
auto dict_id = col_type->as<hdk::ir::ExtDictionaryType>()->dictId();
auto dict_descriptor = getDictMetadata(
dict_id); // this will force materialize the dictionary. it is thread safe
CHECK(dict_descriptor);
}

if (!col_type->isVarLen()) {
size_t col_idx = columnIndex(col_ref.column_id);
if (col_idx >= table.col_data.size()) {
return nullptr;
}
size_t elem_size = col_type->size();
const auto* fixed_type =
dynamic_cast<const arrow::FixedWidthType*>(table.col_data[col_idx]->type().get());
CHECK(fixed_type) << table.col_data[col_idx]->type()->ToString() << " (table "
<< col_ref.table_id << ", column " << col_idx << ")";
size_t arrow_elem_size = fixed_type->bit_width() / 8;
size_t elems = elem_size / arrow_elem_size;
CHECK_GT(elems, (size_t)0);
const auto& data_to_fetch = table.col_data[col_idx];
if (data_to_fetch->num_chunks() == 1) {
auto chunk = data_to_fetch->chunk(0);
const int8_t* ptr =
chunk->data()->GetValues<int8_t>(1, chunk->data()->offset * arrow_elem_size);
size_t chunk_size = chunk->length() * arrow_elem_size;
return std::make_unique<ArrowChunkDataToken>(
std::move(chunk), col_type, ptr, chunk_size);
}
}

return nullptr;
}

void ArrowStorage::fetchFixedLenData(const TableData& table,
size_t frag_idx,
size_t col_idx,
Expand Down
3 changes: 3 additions & 0 deletions omniscidb/ArrowStorage/ArrowStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class ArrowStorage : public SimpleSchemaProvider, public AbstractDataProvider {
const ChunkKey& key,
size_t num_bytes) override;

std::unique_ptr<Data_Namespace::AbstractDataToken> getZeroCopyColumnData(
const ColumnRef& col_ref) override;

TableFragmentsInfo getTableMetadata(int db_id, int table_id) const override;

const DictDescriptor* getDictMetadata(int dict_id, bool load_dict = true) override;
Expand Down
2 changes: 2 additions & 0 deletions omniscidb/DataMgr/AbstractBufferMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class AbstractBufferMgr {
virtual AbstractBuffer* getBuffer(const ChunkKey& key, const size_t numBytes = 0) = 0;
virtual std::unique_ptr<AbstractDataToken> getZeroCopyBufferMemory(const ChunkKey& key,
size_t numBytes) = 0;
virtual std::unique_ptr<AbstractDataToken> getZeroCopyColumnData(
const ColumnRef& col_ref) = 0;
virtual void fetchBuffer(const ChunkKey& key,
AbstractBuffer* destBuffer,
const size_t numBytes = 0) = 0;
Expand Down
7 changes: 7 additions & 0 deletions omniscidb/DataMgr/AbstractDataProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ class AbstractDataProvider : public Data_Namespace::AbstractBufferMgr {
return nullptr;
}

// TODO(dmitriim) remove this method after enabling
// of hashtable, that takes into a count frag_id and offset
std::unique_ptr<Data_Namespace::AbstractDataToken> getZeroCopyColumnData(
const ColumnRef& col_ref) override {
return nullptr;
}

void deleteBuffer(const ChunkKey& key, const bool purge = true) override {
UNREACHABLE();
}
Expand Down
5 changes: 5 additions & 0 deletions omniscidb/DataMgr/BufferMgr/BufferMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,11 @@ std::unique_ptr<AbstractDataToken> BufferMgr::getZeroCopyBufferMemory(const Chun
return parent_mgr_->getZeroCopyBufferMemory(key, numBytes);
}

std::unique_ptr<AbstractDataToken> BufferMgr::getZeroCopyColumnData(
const ColumnRef& col_ref) {
return parent_mgr_->getZeroCopyColumnData(col_ref);
}

MemoryInfo BufferMgr::getMemoryInfo() {
std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
MemoryInfo mi;
Expand Down
3 changes: 3 additions & 0 deletions omniscidb/DataMgr/BufferMgr/BufferMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ class BufferMgr : public AbstractBufferMgr { // implements
std::unique_ptr<AbstractDataToken> getZeroCopyBufferMemory(const ChunkKey& key,
size_t numBytes) override;

std::unique_ptr<AbstractDataToken> getZeroCopyColumnData(
const ColumnRef& col_ref) override;

/**
* @brief Puts the contents of d into the Buffer with ChunkKey key.
* @param key - Unique identifier for a Chunk.
Expand Down
7 changes: 7 additions & 0 deletions omniscidb/DataMgr/DataMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,13 @@ AbstractBuffer* DataMgr::getChunkBuffer(const ChunkKey& key,
return bufferMgrs_[level][deviceId]->getBuffer(key, numBytes);
}

std::unique_ptr<AbstractDataToken> DataMgr::getZeroCopyColumnData(
const ColumnRef& col_ref) {
const auto level = static_cast<size_t>(Data_Namespace::CPU_LEVEL);
CHECK_LT(level, levelSizes_.size()); // make sure we have a legit buffermgr
return bufferMgrs_[level][0]->getZeroCopyColumnData(col_ref);
}

void DataMgr::deleteChunksWithPrefix(const ChunkKey& keyPrefix) {
int numLevels = bufferMgrs_.size();
for (int level = numLevels - 1; level >= 0; --level) {
Expand Down
2 changes: 2 additions & 0 deletions omniscidb/DataMgr/DataMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ class DataMgr {
const MemoryLevel memoryLevel,
const int deviceId = 0,
const size_t numBytes = 0);
// TODO(dmitriim) remove this method after enabling of hashtable
std::unique_ptr<AbstractDataToken> getZeroCopyColumnData(const ColumnRef& col_ref);
void deleteChunksWithPrefix(const ChunkKey& keyPrefix);
void deleteChunksWithPrefix(const ChunkKey& keyPrefix, const MemoryLevel memLevel);
AbstractBuffer* alloc(const MemoryLevel memoryLevel,
Expand Down
7 changes: 7 additions & 0 deletions omniscidb/DataMgr/DataMgrDataProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,16 @@ std::shared_ptr<Chunk_NS::Chunk> DataMgrDataProvider::getChunk(
return Chunk_NS::Chunk::getChunk(
col_info, data_mgr_, key, memory_level, device_id, num_bytes, num_elems);
}

std::unique_ptr<Data_Namespace::AbstractDataToken>
DataMgrDataProvider::getZeroCopyColumnData(const ColumnRef& col_ref) {
return data_mgr_->getZeroCopyColumnData(col_ref);
}

TableFragmentsInfo DataMgrDataProvider::getTableMetadata(int db_id, int table_id) const {
return data_mgr_->getTableMetadata(db_id, table_id);
}

const DictDescriptor* DataMgrDataProvider::getDictMetadata(int dict_id,
bool load_dict) const {
return data_mgr_->getDictMetadata(dict_id, load_dict);
Expand Down
3 changes: 3 additions & 0 deletions omniscidb/DataMgr/DataMgrDataProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class DataMgrDataProvider : public DataProvider {
const size_t num_bytes,
const size_t num_elems) override;

std::unique_ptr<Data_Namespace::AbstractDataToken> getZeroCopyColumnData(
const ColumnRef& col_ref) override;

TableFragmentsInfo getTableMetadata(int db_id, int table_id) const override;

const DictDescriptor* getDictMetadata(int dict_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ std::unique_ptr<AbstractDataToken> PersistentStorageMgr::getZeroCopyBufferMemory
return getStorageMgrForTableKey(key)->getZeroCopyBufferMemory(key, numBytes);
}

std::unique_ptr<AbstractDataToken> PersistentStorageMgr::getZeroCopyColumnData(
const ColumnRef& col_ref) {
return getStorageMgr(col_ref.db_id)->getZeroCopyColumnData(col_ref);
}

void PersistentStorageMgr::fetchBuffer(const ChunkKey& chunk_key,
AbstractBuffer* destination_buffer,
const size_t num_bytes) {
Expand Down
2 changes: 2 additions & 0 deletions omniscidb/DataMgr/PersistentStorageMgr/PersistentStorageMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class PersistentStorageMgr : public AbstractBufferMgr {
AbstractBuffer* getBuffer(const ChunkKey& chunk_key, const size_t num_bytes) override;
std::unique_ptr<AbstractDataToken> getZeroCopyBufferMemory(const ChunkKey& key,
size_t numBytes) override;
std::unique_ptr<AbstractDataToken> getZeroCopyColumnData(
const ColumnRef& col_ref) override;
void fetchBuffer(const ChunkKey& chunk_key,
AbstractBuffer* destination_buffer,
const size_t num_bytes) override;
Expand Down
5 changes: 5 additions & 0 deletions omniscidb/DataProvider/DataProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ class DataProvider {
const size_t num_bytes,
const size_t num_elems) = 0;

// CPU only
// TODO(dmitriim) remove this method after enabling of hashtable
virtual std::unique_ptr<Data_Namespace::AbstractDataToken> getZeroCopyColumnData(
const ColumnRef& col_ref) = 0;

virtual TableFragmentsInfo getTableMetadata(int db_id, int table_id) const = 0;

virtual const DictDescriptor* getDictMetadata(int dict_id,
Expand Down
37 changes: 24 additions & 13 deletions omniscidb/QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,22 +213,20 @@ const int8_t* ColumnFetcher::getOneTableColumnFragment(
auto& chunk_iter = chunk_iter_holder.back();
if (memory_level == Data_Namespace::CPU_LEVEL) {
return reinterpret_cast<int8_t*>(&chunk_iter);
} else {
auto ab = chunk->getBuffer();
auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
row_set_mem_owner->addVarlenInputBuffer(ab);
CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
CHECK(allocator);
auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
allocator->copyToDevice(
chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
return chunk_iter_gpu;
}
} else {
auto ab = chunk->getBuffer();
CHECK(ab->getMemoryPtr());
return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
auto& row_set_mem_owner = executor_->getRowSetMemoryOwner();
row_set_mem_owner->addVarlenInputBuffer(ab);
CHECK_EQ(Data_Namespace::GPU_LEVEL, memory_level);
CHECK(allocator);
auto chunk_iter_gpu = allocator->alloc(sizeof(ChunkIter));
allocator->copyToDevice(
chunk_iter_gpu, reinterpret_cast<int8_t*>(&chunk_iter), sizeof(ChunkIter));
return chunk_iter_gpu;
}
auto ab = chunk->getBuffer();
CHECK(ab->getMemoryPtr());
return ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
}

const int8_t* ColumnFetcher::getAllTableColumnFragments(
Expand All @@ -250,6 +248,19 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
const InputDescriptor table_desc(db_id, table_id, int(0));
{
std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);

auto col_token = data_provider_->getZeroCopyColumnData(*col_info);
if (col_token != nullptr) {
size_t num_rows = col_token->getSize() / col_token->getType()->size();
auto raw_mem_ptr =
executor_->row_set_mem_owner_->saveDataToken(std::move(col_token));
ColumnarResults res(
{const_cast<int8_t*>(raw_mem_ptr)}, num_rows, col_info->type, thread_idx);

return ColumnFetcher::transferColumnIfNeeded(
&res, 0, memory_level, device_id, device_allocator);
}

auto column_it = columnarized_scan_table_cache_.find({table_id, col_id});
if (column_it == columnarized_scan_table_cache_.end()) {
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
Expand Down
5 changes: 2 additions & 3 deletions omniscidb/QueryEngine/NativeCodegen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -711,9 +711,8 @@ std::vector<std::string> get_agg_fnames(
CHECK(target_expr);
auto target_type = target_expr->type();
const auto agg_expr = target_expr->as<hdk::ir::AggExpr>();
const bool is_varlen =
target_type->isString() ||
target_type->isArray(); // TODO: should it use is_varlen_array() ?
// Fixed length arrays are also included here.
const bool is_varlen = target_type->isString() || target_type->isArray();
if (!agg_expr || agg_expr->aggType() == hdk::ir::AggType::kSample) {
result.emplace_back(target_type->isFloatingPoint() ? "agg_id_double" : "agg_id");
if (is_varlen) {
Expand Down
11 changes: 11 additions & 0 deletions omniscidb/ResultSet/RowSetMemoryOwner.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
varlen_buffers_.push_back(varlen_buffer);
}

const int8_t* saveDataToken(std::unique_ptr<AbstractDataToken> token) {
std::lock_guard<std::mutex> lock(state_mutex_);
auto mem_ptr = token->getMemoryPtr();
if (data_tokens_.count(mem_ptr) == 0) {
data_tokens_.insert({mem_ptr, std::move(token)});
}

return data_tokens_[mem_ptr]->getMemoryPtr();
}

/**
* Adds a GPU buffer containing a variable length input column. Variable length inputs
* on GPU are referenced in output projected targets and should not be freed until the
Expand Down Expand Up @@ -258,6 +268,7 @@ class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
std::vector<void*> col_buffers_;
std::vector<Data_Namespace::AbstractBuffer*> varlen_input_buffers_;
std::vector<std::unique_ptr<quantile::TDigest>> t_digests_;
std::unordered_map<const int8_t*, std::unique_ptr<AbstractDataToken>> data_tokens_;

DataProvider* data_provider_; // for metadata lookups
size_t arena_block_size_; // for cloning
Expand Down
21 changes: 19 additions & 2 deletions omniscidb/ResultSetRegistry/ColumnarResults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,8 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
, direct_columnar_conversion_(false)
, thread_idx_(thread_idx) {
auto timer = DEBUG_TIMER(__func__);
const bool is_varlen = target_type->isArray() || target_type->isString();

if (is_varlen) {
if (target_type->isVarLen()) {
throw ColumnarConversionNotSupported();
}
const auto buf_size = num_rows * target_type->size();
Expand All @@ -145,6 +144,24 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
}

ColumnarResults::ColumnarResults(const std::vector<int8_t*> one_col_buffer,
const size_t num_rows,
const hdk::ir::Type* target_type,
const size_t thread_idx)
: column_buffers_(1)
, num_rows_(num_rows)
, target_types_{target_type}
, parallel_conversion_(false)
, direct_columnar_conversion_(false)
, thread_idx_(thread_idx) {
auto timer = DEBUG_TIMER(__func__);

if (target_type->isVarLen()) {
throw ColumnarConversionNotSupported();
}
column_buffers_ = std::move(one_col_buffer);
}

std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
Expand Down
5 changes: 5 additions & 0 deletions omniscidb/ResultSetRegistry/ColumnarResults.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ class ColumnarResults {
const hdk::ir::Type* target_type,
const size_t thread_idx);

ColumnarResults(const std::vector<int8_t*> one_col_buffer,
const size_t num_rows,
const hdk::ir::Type* target_type,
const size_t thread_idx);

static std::unique_ptr<ColumnarResults> mergeResults(
const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const std::vector<std::unique_ptr<ColumnarResults>>& sub_results);
Expand Down

0 comments on commit 3ac48fe

Please sign in to comment.