From ebfd93f91081daf68c59806d26a270b324242411 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 8 Jan 2025 10:33:13 +0900 Subject: [PATCH 1/3] Test timestamp parsing in Delta Lake --- .../transactionlog/TransactionLogParser.java | 4 +++- .../transactionlog/TestTransactionLogParser.java | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java index e67d06cd1088..a71de00a73ac 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; import io.airlift.json.ObjectMapperProvider; @@ -180,7 +181,8 @@ private static Long readPartitionTimestamp(String timestamp) return localDateTime.toEpochSecond(UTC) * MICROSECONDS_PER_SECOND + divide(localDateTime.getNano(), NANOSECONDS_PER_MICROSECOND, UNNECESSARY); } - private static Long readPartitionTimestampWithZone(String timestamp) + @VisibleForTesting + static Long readPartitionTimestampWithZone(String timestamp) { ZonedDateTime zonedDateTime = LocalDateTime.parse(timestamp, PARTITION_TIMESTAMP_FORMATTER).atZone(UTC); return packDateTimeWithZone(zonedDateTime.toInstant().toEpochMilli(), UTC_KEY); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTransactionLogParser.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTransactionLogParser.java index 82504ca6fb33..895a493fe0f3 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTransactionLogParser.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTransactionLogParser.java @@ -20,6 +20,7 @@ import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion; +import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readPartitionTimestampWithZone; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; import static org.assertj.core.api.Assertions.assertThat; @@ -40,4 +41,18 @@ public void testGetCurrentVersion() assertThat(getMandatoryCurrentVersion(fileSystem, basePath + "/simple_table_past_checkpoint", 10)).isEqualTo(11); assertThat(getMandatoryCurrentVersion(fileSystem, basePath + "/simple_table_past_checkpoint", 11)).isEqualTo(11); } + + @Test + void testReadPartitionTimestampWithZone() + { + assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00")).isEqualTo(0L); + assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.1")).isEqualTo(409600L); + assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.01")).isEqualTo(40960L); + assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.001")).isEqualTo(4096L); + + // https://github.com/trinodb/trino/issues/20359 Increase timestamp precision to microseconds + assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.0001")).isEqualTo(0L); + assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.00001")).isEqualTo(0L); + assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.000001")).isEqualTo(0L); + } } From b69bd0f4ec7deaa994679284be6b6451c27a20eb Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 8 Jan 2025 10:34:10 +0900 Subject: [PATCH 2/3] Allow parsing ISO8601 timestamp in Delta lake transaction log --- .../deltalake/transactionlog/TransactionLogParser.java | 10 +++++++++- .../transactionlog/TestTransactionLogParser.java | 7 +++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java index a71de00a73ac..640037c39b7a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogParser.java @@ -83,6 +83,7 @@ import static java.math.RoundingMode.UNNECESSARY; import static java.time.ZoneOffset.UTC; import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME; +import static java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME; import static java.time.temporal.ChronoField.DAY_OF_MONTH; import static java.time.temporal.ChronoField.MONTH_OF_YEAR; import static java.time.temporal.ChronoField.YEAR; @@ -184,7 +185,14 @@ private static Long readPartitionTimestamp(String timestamp) @VisibleForTesting static Long readPartitionTimestampWithZone(String timestamp) { - ZonedDateTime zonedDateTime = LocalDateTime.parse(timestamp, PARTITION_TIMESTAMP_FORMATTER).atZone(UTC); + ZonedDateTime zonedDateTime; + try { + zonedDateTime = LocalDateTime.parse(timestamp, PARTITION_TIMESTAMP_FORMATTER).atZone(UTC); + } + catch (DateTimeParseException _) { + // TODO: avoid this exception-driven logic + zonedDateTime = ZonedDateTime.parse(timestamp, ISO_ZONED_DATE_TIME); + } return packDateTimeWithZone(zonedDateTime.toInstant().toEpochMilli(), UTC_KEY); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTransactionLogParser.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTransactionLogParser.java index 895a493fe0f3..613bb594f895 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTransactionLogParser.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTransactionLogParser.java @@ -55,4 +55,11 @@ void testReadPartitionTimestampWithZone() assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.00001")).isEqualTo(0L); assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.000001")).isEqualTo(0L); } + + @Test + void testReadPartitionTimestampWithZoneIso8601() + { + assertThat(readPartitionTimestampWithZone("1970-01-01T00:00:00.000000Z")).isEqualTo(0L); + assertThat(readPartitionTimestampWithZone("1970-01-01T01:00:00.000000+01:00")).isEqualTo(0L); + } } From 7650a7108566329946cd6b8ededee0637e0e4ce4 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 7 Jan 2025 16:15:29 +0900 Subject: [PATCH 3/3] Update docker-images to 108 --- .../DeltaLakeSchemaSupport.java | 16 ++++---- pom.xml | 2 +- .../TestDeltaLakeColumnMappingMode.java | 39 ++++++++----------- ...TestDeltaLakeSystemTableCompatibility.java | 2 +- 4 files changed, 27 insertions(+), 32 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index 67a73f8ad6a9..d9fc72df070a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -198,13 +198,15 @@ public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata, Pro return ColumnMappingMode.NAME; } - boolean supportsColumnMappingReader = protocolEntry.readerFeaturesContains(COLUMN_MAPPING_FEATURE_NAME); - boolean supportsColumnMappingWriter = protocolEntry.writerFeaturesContains(COLUMN_MAPPING_FEATURE_NAME); - checkArgument( - supportsColumnMappingReader == supportsColumnMappingWriter, - "Both reader and writer features must have the same value for 'columnMapping'. reader: %s, writer: %s", supportsColumnMappingReader, supportsColumnMappingWriter); - if (!supportsColumnMappingReader) { - return ColumnMappingMode.NONE; + if (protocolEntry.supportsReaderFeatures() && protocolEntry.supportsWriterFeatures()) { + boolean supportsColumnMappingReader = protocolEntry.readerFeaturesContains(COLUMN_MAPPING_FEATURE_NAME); + boolean supportsColumnMappingWriter = protocolEntry.writerFeaturesContains(COLUMN_MAPPING_FEATURE_NAME); + checkArgument( + supportsColumnMappingReader == supportsColumnMappingWriter, + "Both reader and writer features must have the same value for 'columnMapping'. reader: %s, writer: %s", supportsColumnMappingReader, supportsColumnMappingWriter); + if (!supportsColumnMappingReader) { + return ColumnMappingMode.NONE; + } } } String columnMappingMode = metadata.getConfiguration().getOrDefault(COLUMN_MAPPING_MODE_CONFIGURATION_KEY, "none"); diff --git a/pom.xml b/pom.xml index 3d59e0397fbd..d9341f4c040f 100644 --- a/pom.xml +++ b/pom.xml @@ -191,7 +191,7 @@ 1.12.780 4.17.0 7.7.1 - 107 + 108 1.22 11.1.0 1.15.1 diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java index 965eda2d4df6..9037f8c833a9 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java @@ -45,7 +45,6 @@ import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTablePropertyOnDelta; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; -import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -106,20 +105,12 @@ public void testColumnMappingModeTableFeature(String mode) .contains(entry("delta.feature.columnMapping", "supported")); // Unsetting delta.columnMapping.mode means changing to 'none' column mapping mode - if (mode.equals("none")) { - onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.columnMapping.mode')"); - assertThat(getTablePropertiesOnDelta("default", tableName)) - .contains(entry("delta.feature.columnMapping", "supported")) - .doesNotContainKey("delta.columnMapping.mode"); - assertThat((String) onTrino().executeQuery("SHOW CREATE TABLE delta.default." + tableName).getOnlyValue()) - .doesNotContain("column_mapping_mode ="); - } - else { - assertQueryFailure(() -> onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.columnMapping.mode')")) - .hasMessageContaining("Changing column mapping mode from '" + mode + "' to 'none' is not supported"); - assertThat((String) onTrino().executeQuery("SHOW CREATE TABLE delta.default." + tableName).getOnlyValue()) - .contains("column_mapping_mode = '" + mode.toUpperCase(ENGLISH) + "'"); - } + onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.columnMapping.mode')"); + assertThat(getTablePropertiesOnDelta("default", tableName)) + .contains(entry("delta.feature.columnMapping", "supported")) + .doesNotContainKey("delta.columnMapping.mode"); + assertThat((String) onTrino().executeQuery("SHOW CREATE TABLE delta.default." + tableName).getOnlyValue()) + .doesNotContain("column_mapping_mode ="); } finally { dropDeltaTableWithRetry("default." + tableName); @@ -135,7 +126,8 @@ public void testTrinoColumnMappingModeReaderAndWriterVersion(String mode) "WITH (" + " location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'," + " column_mapping_mode = '" + mode + "'" + - ")")); + ")"), + 5); } @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider") @@ -196,7 +188,7 @@ public void testChangingColumnMappingModeViaCreateOrReplaceTableOnDelta(String m "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + "TBLPROPERTIES ('delta.columnMapping.mode'='" + mode + "')"); - assertTableReaderAndWriterVersion("default", tableName, "2", "5"); + assertTableReaderAndWriterVersion("default", tableName, "2", "7"); // Revert back to `none` column mode onDelta().executeQuery("" + @@ -206,7 +198,7 @@ public void testChangingColumnMappingModeViaCreateOrReplaceTableOnDelta(String m "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + "TBLPROPERTIES ('delta.columnMapping.mode'='none')"); - assertTableReaderAndWriterVersion("default", tableName, "2", "5"); + assertTableReaderAndWriterVersion("default", tableName, "2", "7"); onTrino().executeQuery("DROP TABLE delta.default." + tableName); } @@ -219,16 +211,17 @@ public void testDeltaColumnMappingModeReaderAndWriterVersion(String mode) "(x INT) " + "USING delta " + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + - "TBLPROPERTIES ('delta.columnMapping.mode'='" + mode + "')")); + "TBLPROPERTIES ('delta.columnMapping.mode'='" + mode + "')"), + 7); } - private void testColumnMappingModeReaderAndWriterVersion(Consumer createTable) + private void testColumnMappingModeReaderAndWriterVersion(Consumer createTable, int expectedMinWriterVersion) { String tableName = "test_dl_column_mapping_version_" + randomNameSuffix(); createTable.accept(tableName); - assertTableReaderAndWriterVersion("default", tableName, "2", "5"); + assertTableReaderAndWriterVersion("default", tableName, "2", Integer.toString(expectedMinWriterVersion)); onTrino().executeQuery("DROP TABLE delta.default." + tableName); } @@ -845,9 +838,9 @@ public Object[][] changeColumnMappingDataProvider() // sourceMappingMode targetMappingMode supported {"none", "id", false}, {"none", "name", true}, - {"id", "none", false}, + {"id", "none", true}, {"id", "name", false}, - {"name", "none", false}, + {"name", "none", true}, {"name", "id", false}, }; } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSystemTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSystemTableCompatibility.java index 63417308ebb7..842737d9ed3c 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSystemTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeSystemTableCompatibility.java @@ -79,7 +79,7 @@ public void testTablePropertiesWithTableFeatures() List expectedRows = ImmutableList.of( row("delta.columnMapping.mode", "id"), row("delta.feature.columnMapping", "supported"), - row("delta.minReaderVersion", "3"), + row("delta.minReaderVersion", "2"), // https://github.com/delta-io/delta/issues/4024 Delta Lake 3.3.0 ignores minReaderVersion row("delta.minWriterVersion", "7")); try { QueryResult deltaResult = onDelta().executeQuery("SHOW TBLPROPERTIES default." + tableName);