diff --git a/src/common/iceberg.cpp b/src/common/iceberg.cpp index 3ce0c5b..ba2f284 100644 --- a/src/common/iceberg.cpp +++ b/src/common/iceberg.cpp @@ -130,7 +130,8 @@ unique_ptr IcebergSnapshot::GetParseInfo(const string &path, return std::move(parse_info); } -IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs, string metadata_compression_codec) { +IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSystem &fs, + string metadata_compression_codec, bool skip_schema_inference) { auto info = GetParseInfo(path, fs, metadata_compression_codec); auto latest_snapshot = FindLatestSnapshotInternal(info->snapshots); @@ -138,10 +139,11 @@ IcebergSnapshot IcebergSnapshot::GetLatestSnapshot(const string &path, FileSyste throw IOException("No snapshots found"); } - return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec); + return ParseSnapShot(latest_snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference); } -IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string metadata_compression_codec) { +IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, + string metadata_compression_codec, bool skip_schema_inference) { auto info = GetParseInfo(path, fs, metadata_compression_codec); auto snapshot = FindSnapshotByIdInternal(info->snapshots, snapshot_id); @@ -149,10 +151,12 @@ IcebergSnapshot IcebergSnapshot::GetSnapshotById(const string &path, FileSystem throw IOException("Could not find snapshot with id " + to_string(snapshot_id)); } - return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec); + return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, + metadata_compression_codec, skip_schema_inference); } -IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec) { +IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string metadata_compression_codec, + bool skip_schema_inference) { auto info = GetParseInfo(path, fs, metadata_compression_codec); auto snapshot = FindSnapshotByIdTimestampInternal(info->snapshots, timestamp); @@ -160,7 +164,7 @@ IcebergSnapshot IcebergSnapshot::GetSnapshotByTimestamp(const string &path, File throw IOException("Could not find latest snapshots for timestamp " + Timestamp::ToString(timestamp)); } - return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec); + return ParseSnapShot(snapshot, info->iceberg_version, info->schema_id, info->schemas, metadata_compression_codec, skip_schema_inference); } // Function to generate a metadata file url @@ -188,7 +192,8 @@ string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string } IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id, - vector &schemas, string metadata_compression_codec) { + vector &schemas, string metadata_compression_codec, + bool skip_schema_inference) { IcebergSnapshot ret; auto snapshot_tag = yyjson_get_tag(snapshot); if (snapshot_tag != YYJSON_TYPE_OBJ) { @@ -206,8 +211,9 @@ IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t icebe ret.manifest_list = IcebergUtils::TryGetStrFromObject(snapshot, "manifest-list"); ret.iceberg_format_version = iceberg_format_version; ret.schema_id = schema_id; - ret.schema = ParseSchema(schemas, ret.schema_id); - + if (!skip_schema_inference) { + ret.schema = ParseSchema(schemas, ret.schema_id); + } return ret; } diff --git a/src/iceberg_functions/iceberg_metadata.cpp b/src/iceberg_functions/iceberg_metadata.cpp index ed41b31..2b5f857 100644 --- a/src/iceberg_functions/iceberg_metadata.cpp +++ b/src/iceberg_functions/iceberg_metadata.cpp @@ -56,6 +56,7 @@ static unique_ptr IcebergMetaDataBind(ClientContext &context, Tabl bool allow_moved_paths = false; string metadata_compression_codec = "none"; + bool skip_schema_inference = false; for (auto &kv : input.named_parameters) { auto loption = StringUtil::Lower(kv.first); @@ -63,21 +64,22 @@ static unique_ptr IcebergMetaDataBind(ClientContext &context, Tabl allow_moved_paths = BooleanValue::Get(kv.second); } else if (loption == "metadata_compression_codec") { metadata_compression_codec = StringValue::Get(kv.second); + } else if (loption == "skip_schema_inference") { + skip_schema_inference = BooleanValue::Get(kv.second); } } - IcebergSnapshot snapshot_to_scan; if (input.inputs.size() > 1) { if (input.inputs[1].type() == LogicalType::UBIGINT) { - snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec); + snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); } else if (input.inputs[1].type() == LogicalType::TIMESTAMP) { snapshot_to_scan = - IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec); + IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); } else { throw InvalidInputException("Unknown argument type in IcebergScanBindReplace."); } } else { - snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec); + snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference); } ret->iceberg_table = @@ -141,18 +143,21 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() { auto fun = TableFunction({LogicalType::VARCHAR}, IcebergMetaDataFunction, IcebergMetaDataBind, IcebergMetaDataGlobalTableFunctionState::Init); + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; function_set.AddFunction(fun); fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, IcebergMetaDataFunction, IcebergMetaDataBind, IcebergMetaDataGlobalTableFunctionState::Init); + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; function_set.AddFunction(fun); fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, IcebergMetaDataFunction, IcebergMetaDataBind, IcebergMetaDataGlobalTableFunctionState::Init); + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; function_set.AddFunction(fun); diff --git a/src/iceberg_functions/iceberg_scan.cpp b/src/iceberg_functions/iceberg_scan.cpp index c0dabbb..fdb4934 100644 --- a/src/iceberg_functions/iceberg_scan.cpp +++ b/src/iceberg_functions/iceberg_scan.cpp @@ -129,16 +129,18 @@ static Value GetParquetSchemaParam(vector &schema) { //! Build the Parquet Scan expression for the files we need to scan static unique_ptr MakeScanExpression(vector &data_file_values, vector &delete_file_values, - vector &schema, bool allow_moved_paths, string metadata_compression_codec) { + vector &schema, bool allow_moved_paths, string metadata_compression_codec, bool skip_schema_inference) { // No deletes, just return a TableFunctionRef for a parquet scan of the data files if (delete_file_values.empty()) { auto table_function_ref_data = make_uniq(); table_function_ref_data->alias = "iceberg_scan_data"; vector> left_children; left_children.push_back(make_uniq(Value::LIST(data_file_values))); - left_children.push_back( - make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), - make_uniq(GetParquetSchemaParam(schema)))); + if (!skip_schema_inference) { + left_children.push_back( + make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), + make_uniq(GetParquetSchemaParam(schema)))); + } table_function_ref_data->function = make_uniq("parquet_scan", std::move(left_children)); return std::move(table_function_ref_data); } @@ -169,10 +171,11 @@ static unique_ptr MakeScanExpression(vector &data_file_values, left_children.push_back(make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("file_row_number"), make_uniq(Value(1)))); - left_children.push_back( - make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), - make_uniq(GetParquetSchemaParam(schema)))); - + if (!skip_schema_inference) { + left_children.push_back( + make_uniq(ExpressionType::COMPARE_EQUAL, make_uniq("schema"), + make_uniq(GetParquetSchemaParam(schema)))); + } table_function_ref_data->function = make_uniq("parquet_scan", std::move(left_children)); join_node->left = std::move(table_function_ref_data); @@ -208,6 +211,7 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table // this allows hive tables to be moved and have mismatching paths, usefull for testing, but will have worse // performance bool allow_moved_paths = false; + bool skip_schema_inference = false; string mode = "default"; string metadata_compression_codec = "none"; @@ -223,21 +227,22 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table mode = StringValue::Get(kv.second); } else if (loption == "metadata_compression_codec") { metadata_compression_codec = StringValue::Get(kv.second); + } else if (loption == "skip_schema_inference") { + skip_schema_inference = BooleanValue::Get(kv.second); } } - IcebergSnapshot snapshot_to_scan; if (input.inputs.size() > 1) { if (input.inputs[1].type() == LogicalType::UBIGINT) { - snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec); + snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); } else if (input.inputs[1].type() == LogicalType::TIMESTAMP) { snapshot_to_scan = - IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec); + IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue(), metadata_compression_codec, skip_schema_inference); } else { throw InvalidInputException("Unknown argument type in IcebergScanBindReplace."); } } else { - snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec); + snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference); } IcebergTable iceberg_table = IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec); @@ -257,7 +262,7 @@ static unique_ptr IcebergScanBindReplace(ClientContext &context, Table if (mode == "list_files") { return MakeListFilesExpression(data_file_values, delete_file_values); } else if (mode == "default") { - return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec); + return MakeScanExpression(data_file_values, delete_file_values, snapshot_to_scan.schema, allow_moved_paths, metadata_compression_codec, skip_schema_inference); } else { throw NotImplementedException("Unknown mode type for ICEBERG_SCAN bind : '" + mode + "'"); } @@ -268,6 +273,7 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() { auto fun = TableFunction({LogicalType::VARCHAR}, nullptr, nullptr, IcebergScanGlobalTableFunctionState::Init); fun.bind_replace = IcebergScanBindReplace; + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["mode"] = LogicalType::VARCHAR; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; @@ -276,6 +282,7 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() { fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, nullptr, nullptr, IcebergScanGlobalTableFunctionState::Init); fun.bind_replace = IcebergScanBindReplace; + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["mode"] = LogicalType::VARCHAR; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; @@ -284,6 +291,7 @@ TableFunctionSet IcebergFunctions::GetIcebergScanFunction() { fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, nullptr, nullptr, IcebergScanGlobalTableFunctionState::Init); fun.bind_replace = IcebergScanBindReplace; + fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN; fun.named_parameters["mode"] = LogicalType::VARCHAR; fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; diff --git a/src/iceberg_functions/iceberg_snapshots.cpp b/src/iceberg_functions/iceberg_snapshots.cpp index c778038..ec76b42 100644 --- a/src/iceberg_functions/iceberg_snapshots.cpp +++ b/src/iceberg_functions/iceberg_snapshots.cpp @@ -13,6 +13,7 @@ struct IcebergSnaphotsBindData : public TableFunctionData { IcebergSnaphotsBindData() {}; string filename; string metadata_compression_codec; + bool skip_schema_inference = false; }; struct IcebergSnapshotGlobalTableFunctionState : public GlobalTableFunctionState { @@ -49,15 +50,19 @@ static unique_ptr IcebergSnapshotsBind(ClientContext &context, Tab auto bind_data = make_uniq(); string metadata_compression_codec = "none"; + bool skip_schema_inference = false; for (auto &kv : input.named_parameters) { auto loption = StringUtil::Lower(kv.first); if (loption == "metadata_compression_codec") { metadata_compression_codec = StringValue::Get(kv.second); + } else if (loption == "skip_schema_inference") { + skip_schema_inference = BooleanValue::Get(kv.second); } } bind_data->filename = input.inputs[0].ToString(); bind_data->metadata_compression_codec = metadata_compression_codec; + bind_data->skip_schema_inference = skip_schema_inference; names.emplace_back("sequence_number"); return_types.emplace_back(LogicalType::UBIGINT); @@ -88,9 +93,11 @@ static void IcebergSnapshotsFunction(ClientContext &context, TableFunctionInput break; } + auto parse_info = IcebergSnapshot::GetParseInfo(*global_state.metadata_doc); auto snapshot = IcebergSnapshot::ParseSnapShot(next_snapshot, global_state.iceberg_format_version, - parse_info->schema_id, parse_info->schemas, bind_data.metadata_compression_codec); + parse_info->schema_id, parse_info->schemas, bind_data.metadata_compression_codec, + bind_data.skip_schema_inference); FlatVector::GetData(output.data[0])[i] = snapshot.sequence_number; FlatVector::GetData(output.data[1])[i] = snapshot.snapshot_id; @@ -108,6 +115,7 @@ TableFunctionSet IcebergFunctions::GetIcebergSnapshotsFunction() { TableFunction table_function({LogicalType::VARCHAR}, IcebergSnapshotsFunction, IcebergSnapshotsBind, IcebergSnapshotGlobalTableFunctionState::Init); table_function.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR; + table_function.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN; function_set.AddFunction(table_function); return std::move(function_set); } diff --git a/src/include/iceberg_metadata.hpp b/src/include/iceberg_metadata.hpp index 7c92847..9dd142d 100644 --- a/src/include/iceberg_metadata.hpp +++ b/src/include/iceberg_metadata.hpp @@ -59,12 +59,12 @@ class IcebergSnapshot { vector schema; string metadata_compression_codec = "none"; - static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, string GetSnapshotByTimestamp); - static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string GetSnapshotByTimestamp); - static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string GetSnapshotByTimestamp); + static IcebergSnapshot GetLatestSnapshot(const string &path, FileSystem &fs, string GetSnapshotByTimestamp, bool skip_schema_inference); + static IcebergSnapshot GetSnapshotById(const string &path, FileSystem &fs, idx_t snapshot_id, string GetSnapshotByTimestamp, bool skip_schema_inference); + static IcebergSnapshot GetSnapshotByTimestamp(const string &path, FileSystem &fs, timestamp_t timestamp, string GetSnapshotByTimestamp, bool skip_schema_inference); static IcebergSnapshot ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id, - vector &schemas, string metadata_compression_codec); + vector &schemas, string metadata_compression_codec, bool skip_schema_inference); static string ReadMetaData(const string &path, FileSystem &fs, string GetSnapshotByTimestamp); static yyjson_val *GetSnapshots(const string &path, FileSystem &fs, string GetSnapshotByTimestamp); static unique_ptr GetParseInfo(yyjson_doc &metadata_json);