Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add progressbar support for index creation #22

Merged
merged 2 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 1639 files
26 changes: 26 additions & 0 deletions src/hnsw/hnsw_index_physical_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class CreateHNSWIndexGlobalState final : public GlobalSinkState {

// Parallel scan state
ColumnDataParallelScanState scan_state;

// Track which phase we're in
atomic<bool> is_building = {false};
atomic<idx_t> loaded_count = {0};
atomic<idx_t> built_count = {0};
};

unique_ptr<GlobalSinkState> PhysicalCreateHNSWIndex::GetGlobalSinkState(ClientContext &context) const {
Expand Down Expand Up @@ -90,7 +95,9 @@ SinkResultType PhysicalCreateHNSWIndex::Sink(ExecutionContext &context, DataChun
OperatorSinkInput &input) const {

auto &lstate = input.local_state.Cast<CreateHNSWIndexLocalState>();
auto &gstate = input.global_state.Cast<CreateHNSWIndexGlobalState>();
lstate.collection->Append(lstate.append_state, chunk);
gstate.loaded_count += chunk.size();
return SinkResultType::NEED_MORE_INPUT;
}

Expand Down Expand Up @@ -178,6 +185,9 @@ class HNSWIndexConstructTask final : public ExecutorTask {
}
}

// Update the built count
gstate.built_count += count;

if (mode == TaskExecutionMode::PROCESS_PARTIAL) {
// yield!
return TaskExecutionResult::TASK_NOT_FINISHED;
Expand Down Expand Up @@ -273,6 +283,9 @@ SinkFinalizeType PhysicalCreateHNSWIndex::Finalize(Pipeline &pipeline, Event &ev
auto &gstate = input.global_state.Cast<CreateHNSWIndexGlobalState>();
auto &collection = gstate.collection;

// Move on to the next phase
gstate.is_building = true;

// Reserve the index size
auto &index = gstate.global_index->index;
index.reserve(collection->Count());
Expand All @@ -287,4 +300,17 @@ SinkFinalizeType PhysicalCreateHNSWIndex::Finalize(Pipeline &pipeline, Event &ev
return SinkFinalizeType::READY;
}

double PhysicalCreateHNSWIndex::GetSinkProgress(ClientContext &context, GlobalSinkState &gstate,
double source_progress) const {
// The "source_progress" is not relevant for CREATE INDEX statements
const auto &state = gstate.Cast<CreateHNSWIndexGlobalState>();
// First half of the progress is appending to the collection
if (!state.is_building) {
return 50.0 *
MinValue(1.0, static_cast<double>(state.loaded_count) / static_cast<double>(estimated_cardinality));
}
// Second half is actually building the index
return 50.0 + (50.0 * static_cast<double>(state.built_count) / static_cast<double>(state.loaded_count));
}

} // namespace duckdb
2 changes: 2 additions & 0 deletions src/include/hnsw/hnsw_index_physical_create.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class PhysicalCreateHNSWIndex : public PhysicalOperator {
bool ParallelSink() const override {
return true;
}

double GetSinkProgress(ClientContext &context, GlobalSinkState &gstate, double source_progress) const override;
};

} // namespace duckdb
Loading