Skip to content

Commit

Permalink
Merge pull request #45 from devendrasr/main
Browse files Browse the repository at this point in the history
Support to skip schema inference
  • Loading branch information
samansmink authored Mar 20, 2024
2 parents f7f1a35 + 8686446 commit 6bda4df
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 31 deletions.
24 changes: 15 additions & 9 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,37 +130,41 @@ unique_ptr<SnapshotParseInfo> IcebergSnapshot::GetParseInfo(const string &path,
return std::move(parse_info);
}

IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs, string metadata_compression_codec) {
IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs,
string metadata_compression_codec, bool skip_schema_inference) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto latest_snapshot = FindLatestSnapshotInternal(info->snapshots);

if (!latest_snapshot) {
throw IOException("No snapshots found");
}

return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec);
return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string metadata_compression_codec) {
IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id,
string metadata_compression_codec, bool skip_schema_inference) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto snapshot = FindSnapshotByIdInternal(info->snapshots, snapshot_id);

if (!snapshot) {
throw IOException("Could not find snapshot with id " + to_string(snapshot_id));
}

return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas,
metadata_compression_codec, skip_schema_inference);
}

IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec) {
IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec,
bool skip_schema_inference) {
auto info = GetParseInfo(path, fs, metadata_compression_codec);
auto snapshot = FindSnapshotByIdTimestampInternal(info->snapshots, timestamp);

if (!snapshot) {
throw IOException("Could not find latest snapshots for timestamp " + Timestamp::ToString(timestamp));
}

return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec);
return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference);
}

// Function to generate a metadata file url
Expand Down Expand Up @@ -188,7 +192,8 @@ string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string
}

IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas, string metadata_compression_codec) {
vector<yyjson_val *> &schemas, string metadata_compression_codec,
bool skip_schema_inference) {
IcebergSnapshot ret;
auto snapshot_tag = yyjson_get_tag(snapshot);
if (snapshot_tag != YYJSON_TYPE_OBJ) {
Expand All @@ -206,8 +211,9 @@ IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t icebe
ret.manifest_list = IcebergUtils::TryGetStrFromObject(snapshot, "manifest-list");
ret.iceberg_format_version = iceberg_format_version;
ret.schema_id = schema_id;
ret.schema = ParseSchema(schemas, ret.schema_id);

if (!skip_schema_inference) {
ret.schema = ParseSchema(schemas, ret.schema_id);
}
return ret;
}

Expand Down
13 changes: 9 additions & 4 deletions src/iceberg_functions/iceberg_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,30 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl

bool allow_moved_paths = false;
string metadata_compression_codec = "none";
bool skip_schema_inference = false;

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (loption == "allow_moved_paths") {
allow_moved_paths = BooleanValue::Get(kv.second);
} else if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
} else if (loption == "skip_schema_inference") {
skip_schema_inference = BooleanValue::Get(kv.second);
}
}

IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec);
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec);
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference);
}

ret->iceberg_table =
Expand Down Expand Up @@ -141,18 +143,21 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() {

auto fun = TableFunction({LogicalType::VARCHAR}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
function_set.AddFunction(fun);
Expand Down
34 changes: 21 additions & 13 deletions src/iceberg_functions/iceberg_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,18 @@ static Value GetParquetSchemaParam(vector<IcebergColumnDefinition> &schema) {

//! Build the Parquet Scan expression for the files we need to scan
static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values, vector<Value> &delete_file_values,
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths, string metadata_compression_codec) {
vector<IcebergColumnDefinition> &schema, bool allow_moved_paths, string metadata_compression_codec, bool skip_schema_inference) {
// No deletes, just return a TableFunctionRef for a parquet scan of the data files
if (delete_file_values.empty()) {
auto table_function_ref_data = make_uniq<TableFunctionRef>();
table_function_ref_data->alias = "iceberg_scan_data";
vector<unique_ptr<ParsedExpression>> left_children;
left_children.push_back(make_uniq<ConstantExpression>(Value::LIST(data_file_values)));
left_children.push_back(
make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("schema"),
make_uniq<ConstantExpression>(GetParquetSchemaParam(schema))));
if (!skip_schema_inference) {
left_children.push_back(
make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("schema"),
make_uniq<ConstantExpression>(GetParquetSchemaParam(schema))));
}
table_function_ref_data->function = make_uniq<FunctionExpression>("parquet_scan", std::move(left_children));
return std::move(table_function_ref_data);
}
Expand Down Expand Up @@ -169,10 +171,11 @@ static unique_ptr<TableRef> MakeScanExpression(vector<Value> &data_file_values,
left_children.push_back(make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL,
make_uniq<ColumnRefExpression>("file_row_number"),
make_uniq<ConstantExpression>(Value(1))));
left_children.push_back(
make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("schema"),
make_uniq<ConstantExpression>(GetParquetSchemaParam(schema))));

if (!skip_schema_inference) {
left_children.push_back(
make_uniq<ComparisonExpression>(ExpressionType::COMPARE_EQUAL, make_uniq<ColumnRefExpression>("schema"),
make_uniq<ConstantExpression>(GetParquetSchemaParam(schema))));
}
table_function_ref_data->function = make_uniq<FunctionExpression>("parquet_scan", std::move(left_children));
join_node->left = std::move(table_function_ref_data);

Expand Down Expand Up @@ -208,6 +211,7 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
// this allows hive tables to be moved and have mismatching paths, usefull for testing, but will have worse
// performance
bool allow_moved_paths = false;
bool skip_schema_inference = false;
string mode = "default";
string metadata_compression_codec = "none";

Expand All @@ -223,21 +227,22 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
mode = StringValue::Get(kv.second);
} else if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
} else if (loption == "skip_schema_inference") {
skip_schema_inference = BooleanValue::Get(kv.second);
}
}

IcebergSnapshot snapshot_to_scan;
if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec);
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec);
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec);
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference);
}

IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec);
Expand All @@ -257,7 +262,7 @@ static unique_ptr<TableRef> IcebergScanBindReplace(ClientContext &context, Table
if (mode == "list_files") {
return MakeListFilesExpression(data_file_values, delete_file_values);
} else if (mode == "default") {
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec);
return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference);
} else {
throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'");
}
Expand All @@ -268,6 +273,7 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {

auto fun = TableFunction({LogicalType::VARCHAR}, nullptr, nullptr, IcebergScanGlobalTableFunctionState::Init);
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
Expand All @@ -276,6 +282,7 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {
fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, nullptr, nullptr,
IcebergScanGlobalTableFunctionState::Init);
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
Expand All @@ -284,6 +291,7 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() {
fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, nullptr, nullptr,
IcebergScanGlobalTableFunctionState::Init);
fun.bind_replace = IcebergScanBindReplace;
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["mode"] = LogicalType::VARCHAR;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
Expand Down
10 changes: 9 additions & 1 deletion src/iceberg_functions/iceberg_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct IcebergSnaphotsBindData : public TableFunctionData {
IcebergSnaphotsBindData() {};
string filename;
string metadata_compression_codec;
bool skip_schema_inference = false;
};

struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState {
Expand Down Expand Up @@ -49,15 +50,19 @@ static unique_ptr<FunctionData> IcebergSnapshotsBind(ClientContext &context, Tab
auto bind_data = make_uniq<IcebergSnaphotsBindData>();

string metadata_compression_codec = "none";
bool skip_schema_inference = false;

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (loption == "metadata_compression_codec") {
metadata_compression_codec = StringValue::Get(kv.second);
} else if (loption == "skip_schema_inference") {
skip_schema_inference = BooleanValue::Get(kv.second);
}
}
bind_data->filename = input.inputs[0].ToString();
bind_data->metadata_compression_codec = metadata_compression_codec;
bind_data->skip_schema_inference = skip_schema_inference;

names.emplace_back("sequence_number");
return_types.emplace_back(LogicalType::UBIGINT);
Expand Down Expand Up @@ -88,9 +93,11 @@ static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput
break;
}


auto parse_info = IcebergSnapshot::GetParseInfo(*global_state.metadata_doc);
auto snapshot = IcebergSnapshot::ParseSnapShot(next_snapshot, global_state.iceberg_format_version,
parse_info->schema_id, parse_info->schemas, bind_data.metadata_compression_codec);
parse_info->schema_id, parse_info->schemas, bind_data.metadata_compression_codec,
bind_data.skip_schema_inference);

FlatVector::GetData<int64_t>(output.data[0])[i] = snapshot.sequence_number;
FlatVector::GetData<int64_t>(output.data[1])[i] = snapshot.snapshot_id;
Expand All @@ -108,6 +115,7 @@ TableFunctionSet IcebergFunctions::GetIcebergSnapshotsFunction() {
TableFunction table_function({LogicalType::VARCHAR}, IcebergSnapshotsFunction, IcebergSnapshotsBind,
IcebergSnapshotGlobalTableFunctionState::Init);
table_function.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;
table_function.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
function_set.AddFunction(table_function);
return std::move(function_set);
}
Expand Down
8 changes: 4 additions & 4 deletions src/include/iceberg_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ class IcebergSnapshot {
vector<IcebergColumnDefinition> schema;
string metadata_compression_codec = "none";

static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, string GetSnapshotByTimestamp);
static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string GetSnapshotByTimestamp);
static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string GetSnapshotByTimestamp);
static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, string GetSnapshotByTimestamp, bool skip_schema_inference);
static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string GetSnapshotByTimestamp, bool skip_schema_inference);
static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string GetSnapshotByTimestamp, bool skip_schema_inference);

static IcebergSnapshot ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas, string metadata_compression_codec);
vector<yyjson_val *> &schemas, string metadata_compression_codec, bool skip_schema_inference);
static string ReadMetaData(const string &path, FileSystem &fs, string GetSnapshotByTimestamp);
static yyjson_val *GetSnapshots(const string &path, FileSystem &fs, string GetSnapshotByTimestamp);
static unique_ptr<SnapshotParseInfo> GetParseInfo(yyjson_doc &metadata_json);
Expand Down

0 comments on commit 6bda4df

Please sign in to comment.