From 9b545d0077814ea9e13b49a8a3847277b98a06a7 Mon Sep 17 00:00:00 2001 From: Ahmed Hamdy Date: Tue, 5 Nov 2024 17:33:15 +0000 Subject: [PATCH] [FLINK-36651][Connectors/Elasticsearch] Fix IT test not compatible with 1.20, drop main branch tests for 1.18 --- .github/workflows/push_pr.yml | 9 ++---- .github/workflows/weekly.yml | 14 +++------ .../sink/ElasticsearchWriterITCase.java | 31 ++----------------- pom.xml | 2 +- 4 files changed, 9 insertions(+), 47 deletions(-) diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index a7a38f6f..c0f25f3b 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -28,13 +28,8 @@ jobs: compile_and_test: strategy: matrix: - flink: [ 1.18-SNAPSHOT ] - jdk: [ '8, 11, 17' ] - include: - - flink: 1.19-SNAPSHOT - jdk: '8, 11, 17, 21' - - flink: 1.20-SNAPSHOT - jdk: '8, 11, 17, 21' + flink: [ 1.19-SNAPSHOT, 1.20-SNAPSHOT ] + jdk: [ '8, 11, 17, 21' ] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml index 6cd802c8..3f5bebac 100644 --- a/.github/workflows/weekly.yml +++ b/.github/workflows/weekly.yml @@ -30,27 +30,21 @@ jobs: strategy: matrix: flink_branches: [{ - flink: 1.18-SNAPSHOT, - jdk: '8, 11, 17', - branch: main - }, { flink: 1.19-SNAPSHOT, - jdk: '8, 11, 17, 21', branch: main }, { flink: 1.20-SNAPSHOT, - jdk: '8, 11, 17, 21', branch: main - }, { - flink: 1.18.1, - branch: v3.0 }, { flink: 1.19.0, branch: v3.0 + }, { + flink: 1.20.0, + branch: main }] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink_branches.flink }} connector_branch: ${{ matrix.flink_branches.branch }} - jdk_version: ${{ matrix.flink_branches.jdk || '8, 11' }} + jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17, 21' }} run_dependency_convergence: false diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java index 248f6a84..1ff68a9d 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java @@ -17,7 +17,6 @@ package org.apache.flink.connector.elasticsearch.sink; -import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.connector.sink2.SinkWriter.Context; import org.apache.flink.api.java.tuple.Tuple2; @@ -31,14 +30,13 @@ import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.junit5.MiniClusterExtension; -import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.TestLoggerExtension; -import org.apache.flink.util.function.ThrowingRunnable; import org.apache.http.HttpHost; import org.elasticsearch.action.ActionListener; @@ -329,7 +327,7 @@ private static ElasticsearchWriter> createWriter( new DefaultBulkResponseInspector(), new NetworkClientConfig(null, null, null, null, null, null, null, null), metricGroup, - new TestMailbox()); + new SyncMailboxExecutor()); } private TestingSinkWriterMetricGroup getSinkWriterMetricGroup() { @@ -481,29 +479,4 @@ GetResponse getResponse(String index, int id) throws IOException { return client.get(new GetRequest(index, Integer.toString(id)), RequestOptions.DEFAULT); } } - - private static class TestMailbox implements MailboxExecutor { - - @Override - public void execute( - ThrowingRunnable command, - String descriptionFormat, - Object... descriptionArgs) { - try { - command.run(); - } catch (Exception e) { - throw new RuntimeException("Unexpected error", e); - } - } - - @Override - public void yield() throws InterruptedException, FlinkRuntimeException { - Thread.sleep(100); - } - - @Override - public boolean tryYield() throws FlinkRuntimeException { - return false; - } - } } diff --git a/pom.xml b/pom.xml index 3a27ceeb..a09b868c 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ under the License. - 1.18.0 + 1.20.0 2.15.3 4.13.2