From f6810357ba1ebe3dbfbe93acbeaaec1496d09d7c Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 23 Jan 2025 18:00:13 +0100 Subject: [PATCH] [EXP][C++] Deduplicate schemas when scanning Dataset --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/dataset/scanner.cc | 22 ++++- cpp/src/arrow/util/deduplicate_internal.cc | 93 ++++++++++++++++++++++ cpp/src/arrow/util/deduplicate_internal.h | 35 ++++++++ cpp/src/parquet/arrow/reader.cc | 2 +- 5 files changed, 148 insertions(+), 5 deletions(-) create mode 100644 cpp/src/arrow/util/deduplicate_internal.cc create mode 100644 cpp/src/arrow/util/deduplicate_internal.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index eb9860b240f16..a6136fba5aa2d 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -516,6 +516,7 @@ set(ARROW_UTIL_SRCS util/crc32.cc util/debug.cc util/decimal.cc + util/deduplicate_internal.cc util/delimiting.cc util/dict_util.cc util/fixed_width_internal.cc diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 0df8fd802656c..b9437306942d3 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -40,6 +40,7 @@ #include "arrow/table.h" #include "arrow/util/async_generator.h" #include "arrow/util/config.h" +#include "arrow/util/deduplicate_internal.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/task_group.h" @@ -291,6 +292,17 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this dataset_; }; +void DeduplicateSchema(std::shared_ptr* batch) { + const auto& schema = (*batch)->schema(); + auto deduplicated_schema = ::arrow::util::Deduplicate(schema); + ARROW_LOG(INFO) << "Schema before deduplication = " << schema.get() << ", after = " << deduplicated_schema.get(); + if (deduplicated_schema != schema) { + // TODO ReplaceSchema creates a new RecordBatch object, + // should have an in-place RecordBatch::DeduplicateSchema + *batch = (*batch)->ReplaceSchema(deduplicated_schema).ValueOrDie(); + } +} + Result FragmentToBatches( const Enumerated>& fragment, const std::shared_ptr& options) { @@ -318,10 +330,11 @@ Result FragmentToBatches( RecordBatch::Make(options->dataset_schema, /*num_rows=*/0, std::move(columns))); auto enumerated_batch_gen = MakeEnumeratedGenerator(std::move(batch_gen)); - auto combine_fn = - [fragment](const Enumerated>& record_batch) { - return EnumeratedRecordBatch{record_batch, fragment}; - }; + auto combine_fn = [fragment](Enumerated> record_batch) { + EnumeratedRecordBatch out{record_batch, fragment}; + DeduplicateSchema(&out.record_batch.value); + return out; + }; return MakeMappedGenerator(enumerated_batch_gen, std::move(combine_fn)); } @@ -423,6 +436,7 @@ Result ToEnumeratedRecordBatch( out.record_batch.last = batch->values[num_fields + 2].scalar_as().value; ARROW_ASSIGN_OR_RAISE(out.record_batch.value, batch->ToRecordBatch(options.projected_schema, options.pool)); + DeduplicateSchema(&out.record_batch.value); return out; } diff --git a/cpp/src/arrow/util/deduplicate_internal.cc b/cpp/src/arrow/util/deduplicate_internal.cc new file mode 100644 index 0000000000000..e58a6efa30004 --- /dev/null +++ b/cpp/src/arrow/util/deduplicate_internal.cc @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/deduplicate_internal.h" + +#include +#include +#include + +#include "arrow/type.h" +#include "arrow/util/logging.h" + +namespace arrow::util { +namespace { + +template +struct Deduplicator { + static_assert(std::is_base_of_v<::arrow::detail::Fingerprintable, T>); + + std::shared_ptr Deduplicate(std::shared_ptr obj) { + auto lock = std::lock_guard{mutex_}; + if (ARROW_PREDICT_FALSE(++lookups_since_last_pruning_ >= kPruneEvery)) { + Prune(); + lookups_since_last_pruning_ = 0; + } + + const std::string& fingerprint = obj->fingerprint(); + if (fingerprint.empty()) { + // Fingerprinting failure + return obj; + } + auto [it, inserted] = cache_.try_emplace(fingerprint, obj); + if (inserted) { + return obj; + } + auto cached = it->second.lock(); + if (cached) { + return cached; + } + it->second = obj; + return obj; + } + + private: + void Prune() { + const size_t size_before = cache_.size(); + + auto it = cache_.begin(); + while (it != cache_.end()) { + auto cur = it++; // cur will be invalidated on erasure, so increment now + if (cur->second.expired()) { + cache_.erase(cur); + } + } + } + + static constexpr int kPruneEvery = 100; + + std::mutex mutex_; + // TODO fingerprints can be large, we should use a fast cryptographic hash instead, + // such as Blake3 + std::unordered_map> cache_; + int lookups_since_last_pruning_ = 0; +}; + +Deduplicator g_field_deduplicator; +Deduplicator g_schema_deduplicator; + +} // namespace + +std::shared_ptr Deduplicate(std::shared_ptr field) { + return g_field_deduplicator.Deduplicate(std::move(field)); +} + +std::shared_ptr Deduplicate(std::shared_ptr schema) { + return g_schema_deduplicator.Deduplicate(std::move(schema)); +} + +} // namespace arrow::util diff --git a/cpp/src/arrow/util/deduplicate_internal.h b/cpp/src/arrow/util/deduplicate_internal.h new file mode 100644 index 0000000000000..2bbc55547abc7 --- /dev/null +++ b/cpp/src/arrow/util/deduplicate_internal.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/type_fwd.h" +#include "arrow/util/macros.h" + +namespace arrow::util { + +ARROW_EXPORT +std::shared_ptr Deduplicate(std::shared_ptr field); + +ARROW_EXPORT +std::shared_ptr Deduplicate(std::shared_ptr schema); + +} // namespace arrow::util diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 465ad5844d31a..de6db3875c228 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1259,7 +1259,7 @@ Future> FileReaderImpl::DecodeRowGroups( return column; }; auto make_table = [result_schema, row_groups, self, - this](const ::arrow::ChunkedArrayVector& columns) + this](const ::arrow::ChunkedArrayVector& columns) mutable -> ::arrow::Result> { int64_t num_rows = 0; if (!columns.empty()) {