From 1f82675de7cf87eb2cb6be43769db36a42908c76 Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Sat, 13 Apr 2024 15:15:55 +0200 Subject: [PATCH] [FLINK-35094] SinkTestSuiteBase.testScaleDown is hanging for 1.20-SNAPSHOT --- .../core/execution/CheckpointingMode.java | 30 ++++++++++++++ .../tests/ElasticsearchSinkE2ECaseBase.java | 41 ++++++++++++++++++- 2 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/core/execution/CheckpointingMode.java diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/core/execution/CheckpointingMode.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/core/execution/CheckpointingMode.java new file mode 100644 index 00000000..070a8622 --- /dev/null +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/core/execution/CheckpointingMode.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.execution; + +/** + * This is a copy of {@link CheckpointingMode} from flink-core module introduced in Flink 1.20. We + * need it here to make E2E Sink tests compatible with earlier releases. Could be removed together + * with dropping support of Flink 1.19. + */ +public enum CheckpointingMode { + EXACTLY_ONCE, + AT_LEAST_ONCE; + + private CheckpointingMode() {} +} diff --git a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java index b6f4a9f3..6f319851 100644 --- a/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java +++ b/flink-connector-elasticsearch-e2e-tests/flink-connector-elasticsearch-e2e-tests-common/src/main/java/org/apache/flink/streaming/tests/ElasticsearchSinkE2ECaseBase.java @@ -38,6 +38,7 @@ import java.util.stream.Collectors; import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat; +import static org.apache.flink.core.execution.CheckpointingMode.convertFromCheckpointingMode; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; /** Base classs for end to end ElasticsearchSink tests based on connector testing framework. */ @@ -72,7 +73,8 @@ public abstract class ElasticsearchSinkE2ECaseBase> .bindWithFlinkContainer(flink.getFlinkContainers().getJobManager()) .build(); - @Override + /** Could be removed together with dropping support of Flink 1.19. */ + @Deprecated protected void checkResultWithSemantic( ExternalSystemDataReader reader, List testData, CheckpointingMode semantic) throws Exception { @@ -93,9 +95,46 @@ protected void checkResultWithSemantic( READER_RETRY_ATTEMPTS); } + protected void checkResultWithSemantic( + ExternalSystemDataReader reader, + List testData, + org.apache.flink.core.execution.CheckpointingMode semantic) + throws Exception { + waitUntilCondition( + () -> { + try { + List result = reader.poll(Duration.ofMillis(READER_TIMEOUT)); + assertThat(sort(result).iterator()) + .matchesRecordsFromSource( + Collections.singletonList(sort(testData)), + convertFromCheckpointingMode(semantic)); + return true; + } catch (Throwable t) { + LOG.warn("Polled results not as expected", t); + return false; + } + }, + 5000, + READER_RETRY_ATTEMPTS); + } + private List sort(List list) { return list.stream().sorted().collect(Collectors.toList()); } + /** Could be removed together with dropping support of Flink 1.19. */ + @Deprecated + private static org.apache.flink.streaming.api.CheckpointingMode convertFromCheckpointingMode( + org.apache.flink.core.execution.CheckpointingMode semantic) { + switch (semantic) { + case EXACTLY_ONCE: + return org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; + case AT_LEAST_ONCE: + return org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE; + default: + throw new IllegalArgumentException("Unsupported semantic: " + semantic); + } + } + abstract String getElasticsearchContainerName(); }