Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow parsing ISO8601 timestamp in Delta lake transaction log + Update docker-images to 108 #24635

Merged
merged 3 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,15 @@ public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata, Pro
return ColumnMappingMode.NAME;
}

boolean supportsColumnMappingReader = protocolEntry.readerFeaturesContains(COLUMN_MAPPING_FEATURE_NAME);
boolean supportsColumnMappingWriter = protocolEntry.writerFeaturesContains(COLUMN_MAPPING_FEATURE_NAME);
checkArgument(
supportsColumnMappingReader == supportsColumnMappingWriter,
"Both reader and writer features must have the same value for 'columnMapping'. reader: %s, writer: %s", supportsColumnMappingReader, supportsColumnMappingWriter);
if (!supportsColumnMappingReader) {
return ColumnMappingMode.NONE;
if (protocolEntry.supportsReaderFeatures() && protocolEntry.supportsWriterFeatures()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required because Delta protocol allows a combination of minReaderVersion:2 and minWriterVersion:7.

boolean supportsColumnMappingReader = protocolEntry.readerFeaturesContains(COLUMN_MAPPING_FEATURE_NAME);
boolean supportsColumnMappingWriter = protocolEntry.writerFeaturesContains(COLUMN_MAPPING_FEATURE_NAME);
checkArgument(
supportsColumnMappingReader == supportsColumnMappingWriter,
"Both reader and writer features must have the same value for 'columnMapping'. reader: %s, writer: %s", supportsColumnMappingReader, supportsColumnMappingWriter);
if (!supportsColumnMappingReader) {
return ColumnMappingMode.NONE;
}
}
}
String columnMappingMode = metadata.getConfiguration().getOrDefault(COLUMN_MAPPING_MODE_CONFIGURATION_KEY, "none");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import io.airlift.json.ObjectMapperProvider;
Expand Down Expand Up @@ -82,6 +83,7 @@
import static java.math.RoundingMode.UNNECESSARY;
import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
import static java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME;
import static java.time.temporal.ChronoField.DAY_OF_MONTH;
import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
import static java.time.temporal.ChronoField.YEAR;
Expand Down Expand Up @@ -180,9 +182,17 @@ private static Long readPartitionTimestamp(String timestamp)
return localDateTime.toEpochSecond(UTC) * MICROSECONDS_PER_SECOND + divide(localDateTime.getNano(), NANOSECONDS_PER_MICROSECOND, UNNECESSARY);
}

private static Long readPartitionTimestampWithZone(String timestamp)
@VisibleForTesting
static Long readPartitionTimestampWithZone(String timestamp)
{
ZonedDateTime zonedDateTime = LocalDateTime.parse(timestamp, PARTITION_TIMESTAMP_FORMATTER).atZone(UTC);
ZonedDateTime zonedDateTime;
try {
zonedDateTime = LocalDateTime.parse(timestamp, PARTITION_TIMESTAMP_FORMATTER).atZone(UTC);
}
catch (DateTimeParseException _) {
// TODO: avoid this exception-driven logic
zonedDateTime = ZonedDateTime.parse(timestamp, ISO_ZONED_DATE_TIME);
}
return packDateTimeWithZone(zonedDateTime.toInstant().toEpochMilli(), UTC_KEY);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readPartitionTimestampWithZone;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -40,4 +41,25 @@ public void testGetCurrentVersion()
assertThat(getMandatoryCurrentVersion(fileSystem, basePath + "/simple_table_past_checkpoint", 10)).isEqualTo(11);
assertThat(getMandatoryCurrentVersion(fileSystem, basePath + "/simple_table_past_checkpoint", 11)).isEqualTo(11);
}

@Test
void testReadPartitionTimestampWithZone()
{
assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00")).isEqualTo(0L);
assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.1")).isEqualTo(409600L);
assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.01")).isEqualTo(40960L);
assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.001")).isEqualTo(4096L);

// https://github.com/trinodb/trino/issues/20359 Increase timestamp precision to microseconds
assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.0001")).isEqualTo(0L);
assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.00001")).isEqualTo(0L);
assertThat(readPartitionTimestampWithZone("1970-01-01 00:00:00.000001")).isEqualTo(0L);
}

@Test
void testReadPartitionTimestampWithZoneIso8601()
{
assertThat(readPartitionTimestampWithZone("1970-01-01T00:00:00.000000Z")).isEqualTo(0L);
assertThat(readPartitionTimestampWithZone("1970-01-01T01:00:00.000000+01:00")).isEqualTo(0L);
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@
<dep.aws-sdk.version>1.12.780</dep.aws-sdk.version>
<dep.cassandra.version>4.17.0</dep.cassandra.version>
<dep.confluent.version>7.7.1</dep.confluent.version>
<dep.docker.images.version>107</dep.docker.images.version>
<dep.docker.images.version>108</dep.docker.images.version>
<dep.drift.version>1.22</dep.drift.version>
<dep.flyway.version>11.1.0</dep.flyway.version>
<dep.frontend-maven-plugin.version>1.15.1</dep.frontend-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTablePropertyOnDelta;
import static io.trino.tests.product.utils.QueryExecutors.onDelta;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -106,20 +105,12 @@ public void testColumnMappingModeTableFeature(String mode)
.contains(entry("delta.feature.columnMapping", "supported"));

// Unsetting delta.columnMapping.mode means changing to 'none' column mapping mode
if (mode.equals("none")) {
onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.columnMapping.mode')");
assertThat(getTablePropertiesOnDelta("default", tableName))
.contains(entry("delta.feature.columnMapping", "supported"))
.doesNotContainKey("delta.columnMapping.mode");
assertThat((String) onTrino().executeQuery("SHOW CREATE TABLE delta.default." + tableName).getOnlyValue())
.doesNotContain("column_mapping_mode =");
}
else {
assertQueryFailure(() -> onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.columnMapping.mode')"))
.hasMessageContaining("Changing column mapping mode from '" + mode + "' to 'none' is not supported");
assertThat((String) onTrino().executeQuery("SHOW CREATE TABLE delta.default." + tableName).getOnlyValue())
.contains("column_mapping_mode = '" + mode.toUpperCase(ENGLISH) + "'");
}
onDelta().executeQuery("ALTER TABLE default." + tableName + " UNSET TBLPROPERTIES ('delta.columnMapping.mode')");
assertThat(getTablePropertiesOnDelta("default", tableName))
.contains(entry("delta.feature.columnMapping", "supported"))
.doesNotContainKey("delta.columnMapping.mode");
assertThat((String) onTrino().executeQuery("SHOW CREATE TABLE delta.default." + tableName).getOnlyValue())
.doesNotContain("column_mapping_mode =");
}
finally {
dropDeltaTableWithRetry("default." + tableName);
Expand All @@ -135,7 +126,8 @@ public void testTrinoColumnMappingModeReaderAndWriterVersion(String mode)
"WITH (" +
" location = 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'," +
" column_mapping_mode = '" + mode + "'" +
")"));
")"),
5);
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
Expand Down Expand Up @@ -196,7 +188,7 @@ public void testChangingColumnMappingModeViaCreateOrReplaceTableOnDelta(String m
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES ('delta.columnMapping.mode'='" + mode + "')");

assertTableReaderAndWriterVersion("default", tableName, "2", "5");
assertTableReaderAndWriterVersion("default", tableName, "2", "7");

// Revert back to `none` column mode
onDelta().executeQuery("" +
Expand All @@ -206,7 +198,7 @@ public void testChangingColumnMappingModeViaCreateOrReplaceTableOnDelta(String m
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES ('delta.columnMapping.mode'='none')");

assertTableReaderAndWriterVersion("default", tableName, "2", "5");
assertTableReaderAndWriterVersion("default", tableName, "2", "7");

onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
Expand All @@ -219,16 +211,17 @@ public void testDeltaColumnMappingModeReaderAndWriterVersion(String mode)
"(x INT) " +
"USING delta " +
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
"TBLPROPERTIES ('delta.columnMapping.mode'='" + mode + "')"));
"TBLPROPERTIES ('delta.columnMapping.mode'='" + mode + "')"),
7);
}

private void testColumnMappingModeReaderAndWriterVersion(Consumer<String> createTable)
private void testColumnMappingModeReaderAndWriterVersion(Consumer<String> createTable, int expectedMinWriterVersion)
{
String tableName = "test_dl_column_mapping_version_" + randomNameSuffix();

createTable.accept(tableName);

assertTableReaderAndWriterVersion("default", tableName, "2", "5");
assertTableReaderAndWriterVersion("default", tableName, "2", Integer.toString(expectedMinWriterVersion));

onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
Expand Down Expand Up @@ -845,9 +838,9 @@ public Object[][] changeColumnMappingDataProvider()
// sourceMappingMode targetMappingMode supported
{"none", "id", false},
{"none", "name", true},
{"id", "none", false},
{"id", "none", true},
{"id", "name", false},
{"name", "none", false},
{"name", "none", true},
{"name", "id", false},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void testTablePropertiesWithTableFeatures()
List<Row> expectedRows = ImmutableList.of(
row("delta.columnMapping.mode", "id"),
row("delta.feature.columnMapping", "supported"),
row("delta.minReaderVersion", "3"),
row("delta.minReaderVersion", "2"), // https://github.com/delta-io/delta/issues/4024 Delta Lake 3.3.0 ignores minReaderVersion
row("delta.minWriterVersion", "7"));
try {
QueryResult deltaResult = onDelta().executeQuery("SHOW TBLPROPERTIES default." + tableName);
Expand Down
Loading