Skip to content

Commit

Permalink
fix partitioning issue with null values as partition
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed Jun 3, 2024
1 parent 1dfbc38 commit c08dd1b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 28 deletions.
69 changes: 68 additions & 1 deletion src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,80 @@ unique_ptr<MultiFileReaderGlobalState> DeltaMultiFileReader::InitializeGlobalSta
return std::move(res);
}

// This code is duplicated from MultiFileReader::CreateNameMapping the difference is that for columns that are not found
// in the parquet files, we just add null constant columns
static void CustomMulfiFileNameMapping(const string &file_name, const vector<LogicalType> &local_types,
const vector<string> &local_names, const vector<LogicalType> &global_types,
const vector<string> &global_names, const vector<column_t> &global_column_ids,
MultiFileReaderData &reader_data, const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state) {
D_ASSERT(global_types.size() == global_names.size());
D_ASSERT(local_types.size() == local_names.size());
// we have expected types: create a map of name -> column index
case_insensitive_map_t<idx_t> name_map;
for (idx_t col_idx = 0; col_idx < local_names.size(); col_idx++) {
name_map[local_names[col_idx]] = col_idx;
}
for (idx_t i = 0; i < global_column_ids.size(); i++) {
// check if this is a constant column
bool constant = false;
for (auto &entry : reader_data.constant_map) {
if (entry.column_id == i) {
constant = true;
break;
}
}
if (constant) {
// this column is constant for this file
continue;
}
// not constant - look up the column in the name map
auto global_id = global_column_ids[i];
if (global_id >= global_types.size()) {
throw InternalException(
"MultiFileReader::CreatePositionalMapping - global_id is out of range in global_types for this file");
}
auto &global_name = global_names[global_id];
auto entry = name_map.find(global_name);
if (entry == name_map.end()) {
string candidate_names;
for (auto &local_name : local_names) {
if (!candidate_names.empty()) {
candidate_names += ", ";
}
candidate_names += local_name;
}
// FIXME: this override is pretty hacky: for missing columns we just insert NULL constants
auto global_type = global_types[global_id];
Value val (global_type);
reader_data.constant_map.push_back({i, val});
continue;
}
// we found the column in the local file - check if the types are the same
auto local_id = entry->second;
D_ASSERT(global_id < global_types.size());
D_ASSERT(local_id < local_types.size());
auto &global_type = global_types[global_id];
auto &local_type = local_types[local_id];
if (global_type != local_type) {
reader_data.cast_map[local_id] = global_type;
}
// the types are the same - create the mapping
reader_data.column_mapping.push_back(i);
reader_data.column_ids.push_back(local_id);
}

reader_data.empty_columns = reader_data.column_ids.empty();
}

void DeltaMultiFileReader::CreateNameMapping(const string &file_name, const vector<LogicalType> &local_types,
const vector<string> &local_names, const vector<LogicalType> &global_types,
const vector<string> &global_names, const vector<column_t> &global_column_ids,
MultiFileReaderData &reader_data, const string &initial_file,
optional_ptr<MultiFileReaderGlobalState> global_state) {
// First call the base implementation to do most mapping
MultiFileReader::CreateNameMapping(file_name, local_types, local_names, global_types, global_names, global_column_ids, reader_data, initial_file, global_state);
CustomMulfiFileNameMapping(file_name, local_types, local_names, global_types, global_names, global_column_ids,
reader_data, initial_file, global_state);

// Then we handle delta specific mapping
D_ASSERT(global_state);
Expand Down
17 changes: 2 additions & 15 deletions test/sql/dat/all.test
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ SELECT *
FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/with_schema_change/expected/latest/**/*.parquet')
----


### FAILING DAT TESTS

# TODO fix all of these
mode skip

# basic_partitioned
query I rowsort basic_partitioned
SELECT *
Expand All @@ -71,16 +65,9 @@ SELECT *
FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/basic_partitioned/expected/latest/**/*.parquet')
----

# multi_partitioned
query I rowsort multi_partitioned
SELECT *
FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned/delta')
----
### FAILING DAT TESTS

query I rowsort multi_partitioned
SELECT *
FROM parquet_scan('${DAT_PATH}/out/reader_tests/generated/multi_partitioned/expected/latest/**/*.parquet')
----
mode skip

# multi_partitioned
query I rowsort multi_partitioned
Expand Down
7 changes: 0 additions & 7 deletions test/sql/dat/basic_append.test
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ FROM delta_scan('${DAT_PATH}/out/reader_tests/generated/basic_append/delta')
2
3

# TODO: Figure out what's wrong here
mode skip

# Now we add a filter that filters out one of the files
query II
SELECT letter, number
Expand All @@ -67,8 +64,6 @@ WHERE number < 2
----
a 1

mode unskip

# Now we add a filter that filters out the other file
query III
SELECT a_float, letter, number,
Expand All @@ -77,8 +72,6 @@ WHERE number > 4
----
5.5 e 5

mode skip

# Now we add a filter that filters out all columns
query III
SELECT a_float, number, letter
Expand Down
12 changes: 7 additions & 5 deletions test/sql/delta_kernel_rs/basic_partitioned.test
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ require delta

require-env DELTA_KERNEL_TESTS_PATH

# FIXME: this fails due some weird error
mode skip

statement error
query III
SELECT * FROM delta_scan('${DELTA_KERNEL_TESTS_PATH}/basic_partitioned')
----
Failed to read file "/Users/sam/Development/delta-kernel-testing/delta-kernel-rs/kernel/tests/data/basic_partitioned/letter=__HIVE_DEFAULT_PARTITION__
NULL 6 6.6
a 4 4.4
e 5 5.5
a 1 1.1
b 2 2.2
c 3 3.3

0 comments on commit c08dd1b

Please sign in to comment.