Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
Avoid unnecessary chunk stats recomputation on append.
Browse files Browse the repository at this point in the history
Signed-off-by: ienkovich <[email protected]>
  • Loading branch information
ienkovich committed Aug 24, 2023
1 parent c603402 commit 9d026ff
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions omniscidb/ArrowStorage/ArrowStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
(static_cast<size_t>(at->num_rows()) + table.fragment_size - 1 - first_frag_size) /
table.fragment_size +
1;
size_t last_orig_frag_idx = fragments.empty() ? 0 : fragments.size() - 1;
// Pre-allocate fragment infos and table stats for each column for the following
// parallel data import.
fragments.resize(frag_count);
Expand Down Expand Up @@ -815,9 +816,9 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
if (col_type->isFixedLenArray()) {
elems_count = col_type->size() / elem_type->size();
}
// Compute stats for each fragment.
// Compute stats for each added/modified fragment.
tbb::parallel_for(
tbb::blocked_range(size_t(0), frag_count), [&](auto frag_range) {
tbb::blocked_range(last_orig_frag_idx, frag_count), [&](auto frag_range) {
for (size_t frag_idx = frag_range.begin(); frag_idx != frag_range.end();
++frag_idx) {
auto& frag = fragments[frag_idx];
Expand Down Expand Up @@ -855,17 +856,21 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
}
}); // each fragment

// Merge fragment stats to the table stats.
// Merge added/mdodified fragment stats to the table stats.
auto& column_stats = table_stats.at(col_info->column_id);
column_stats = fragments[0].metadata[col_idx]->chunkStats();
for (size_t frag_idx = 1; frag_idx < frag_count; ++frag_idx) {
if (!last_orig_frag_idx) {
column_stats = fragments[0].metadata[col_idx]->chunkStats();
}
for (size_t frag_idx = last_orig_frag_idx ? last_orig_frag_idx : 1;
frag_idx < frag_count;
++frag_idx) {
mergeStats(column_stats,
fragments[frag_idx].metadata[col_idx]->chunkStats(),
col_type);
}
} else {
bool has_nulls = false;
for (size_t frag_idx = 0; frag_idx < frag_count; ++frag_idx) {
for (size_t frag_idx = last_orig_frag_idx; frag_idx < frag_count; ++frag_idx) {
auto& frag = fragments[frag_idx];
frag.offset =
frag_idx ? ((frag_idx - 1) * table.fragment_size + first_frag_size) : 0;
Expand All @@ -886,7 +891,8 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
}

auto& column_stats = table_stats.at(col_info->column_id);
column_stats.has_nulls = has_nulls;
column_stats.has_nulls =
last_orig_frag_idx ? (has_nulls || column_stats.has_nulls) : has_nulls;
column_stats.min.stringval = nullptr;
column_stats.max.stringval = nullptr;
}
Expand Down

0 comments on commit 9d026ff

Please sign in to comment.