Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support source watermark for flink sql windows #12191

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkFilters;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.assigner.SplitAssignerType;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -53,7 +55,8 @@ public class IcebergTableSource
implements ScanTableSource,
SupportsProjectionPushDown,
SupportsFilterPushDown,
SupportsLimitPushDown {
SupportsLimitPushDown,
SupportsSourceWatermark {

private int[] projectedFields;
private Long limit;
Expand Down Expand Up @@ -175,6 +178,18 @@ public Result applyFilters(List<ResolvedExpression> flinkFilters) {
return Result.of(acceptedFilters, flinkFilters);
}

@Override
public void applySourceWatermark() {
if (!readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE)) {
throw new UnsupportedOperationException(
"Source watermarks are supported only in flip-27 iceberg source implementation");
}

Preconditions.checkNotNull(
properties.get(FlinkReadOptions.WATERMARK_COLUMN),
"watermark-column needs to be configured to use source watermark.");
}

@Override
public boolean supportsNestedProjection() {
// TODO: support nested projection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected void dropDatabase(String database, boolean ifExists) {
sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database);
}

protected static String toWithClause(Map<String, String> props) {
public static String toWithClause(Map<String, String> props) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change? Did this become a utility method which is shared between tests? Can we find a better place for it then?

StringBuilder builder = new StringBuilder();
builder.append("(");
int propCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.iceberg.flink.source;

import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
Expand All @@ -35,11 +37,15 @@
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.SqlBase;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -72,6 +78,11 @@ public void before() throws IOException {
tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
}

@AfterEach
public void after() throws IOException {
CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
}

private Record generateRecord(Instant t1, long t2) {
Record record = GenericRecord.create(SCHEMA_TS);
record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime());
Expand Down Expand Up @@ -162,4 +173,74 @@ public void testWatermarkOptionsDescending() throws Exception {
expected,
SCHEMA_TS);
}

@Test
public void testWatermarkInvalidConfig() {
CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS);

String flinkTable = "`default_catalog`.`default_database`.flink_table";
try {
SqlHelpers.sql(
getStreamingTableEnv(),
"CREATE TABLE %s "
+ "(t1 TIMESTAMP(6), "
+ "t2 BIGINT,"
+ "eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+ "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH %s",
flinkTable,
SqlBase.toWithClause(getConnectorOptions()));

assertThatThrownBy(
() -> SqlHelpers.sql(getStreamingTableEnv(), "SELECT * FROM %s", flinkTable))
.isInstanceOf(NullPointerException.class)
.hasMessage("watermark-column needs to be configured to use source watermark.");
} finally {
SqlHelpers.sql(getStreamingTableEnv(), "DROP TABLE IF EXISTS %s", flinkTable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this try finally? Can we just do a cleanup in an after method? That would make the tests easier to read

}
}

@Test
public void testWatermarkValidConfig() throws Exception {
List<Record> expected = generateExpectedRecords(true);

Map<String, String> connectorOptions = Maps.newHashMap(getConnectorOptions());
connectorOptions.put("watermark-column", "t1");

String flinkTable = "`default_catalog`.`default_database`.flink_table";

SqlHelpers.sql(
getStreamingTableEnv(),
"CREATE TABLE %s "
+ "(t1 TIMESTAMP(6), "
+ "t2 BIGINT,"
+ "eventTS AS CAST(t1 AS TIMESTAMP(3)), "
+ "WATERMARK FOR eventTS AS SOURCE_WATERMARK()) WITH %s",
flinkTable,
SqlBase.toWithClause(connectorOptions));

TestHelpers.assertRecordsWithOrder(
SqlHelpers.sql(
getStreamingTableEnv(),
"SELECT t1, t2 FROM TABLE(TUMBLE(TABLE %s, DESCRIPTOR(eventTS), INTERVAL '1' SECOND))",
flinkTable),
expected,
SCHEMA_TS);
}

@NotNull
private static Map<String, String> getConnectorOptions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do the PRs in a different order (first merging the optionless part), could we get rid of this method?

return Map.of(
"connector",
"iceberg",
"catalog-type",
"hadoop",
"catalog-name",
"iceberg_catalog",
"catalog-database",
TestFixtures.DATABASE,
"catalog-table",
TestFixtures.TABLE,
"warehouse",
CATALOG_EXTENSION.warehouse());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public abstract class TestSqlBase {

private volatile TableEnvironment tEnv;

private volatile TableEnvironment streamingTEnv;

protected TableEnvironment getTableEnv() {
if (tEnv == null) {
synchronized (this) {
Expand All @@ -75,6 +77,18 @@ protected TableEnvironment getTableEnv() {
return tEnv;
}

protected TableEnvironment getStreamingTableEnv() {
if (streamingTEnv == null) {
synchronized (this) {
if (streamingTEnv == null) {
this.streamingTEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
}
}
}
return streamingTEnv;
}

@BeforeEach
public abstract void before() throws IOException;

Expand Down