diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 1eb0f2e05..e167e2e32 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -25,7 +25,7 @@ jobs: if: github.repository_owner == 'apache' strategy: matrix: - flink: [1.19-SNAPSHOT, 1.20-SNAPSHOT] + flink: [1.20-SNAPSHOT] java: [ '8, 11, 17'] uses: ./.github/workflows/common.yml with: @@ -38,7 +38,7 @@ jobs: python_test: strategy: matrix: - flink: [1.19-SNAPSHOT, 1.20-SNAPSHOT] + flink: [1.20-SNAPSHOT] uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index b47e29fd7..759d3ae8e 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -26,7 +26,7 @@ jobs: uses: ./.github/workflows/common.yml strategy: matrix: - flink: [1.19.1, 1.20.0] + flink: [1.20.0] java: [ '8, 11, 17'] with: flink_version: ${{ matrix.flink }} @@ -38,7 +38,7 @@ jobs: python_test: strategy: matrix: - flink: [1.19.0, 1.20.0] + flink: [1.20.0] uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java index 6cfa60775..7660a4dd6 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java @@ -26,6 +26,7 @@ import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.ResultHandler; import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; @@ -45,7 +46,6 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; @@ -171,7 +171,7 @@ private static FirehoseAsyncClient createFirehoseClient( @Override protected void submitRequestEntries( - List requestEntries, Consumer> requestResult) { + List requestEntries, ResultHandler resultHandler) { PutRecordBatchRequest batchRequest = PutRecordBatchRequest.builder() @@ -185,11 +185,11 @@ protected void submitRequestEntries( future.whenComplete( (response, err) -> { if (err != null) { - handleFullyFailedRequest(err, requestEntries, requestResult); + handleFullyFailedRequest(err, requestEntries, resultHandler); } else if (response.failedPutCount() > 0) { - handlePartiallyFailedRequest(response, requestEntries, requestResult); + handlePartiallyFailedRequest(response, requestEntries, resultHandler); } else { - requestResult.accept(Collections.emptyList()); + resultHandler.complete(); } }); } @@ -205,17 +205,19 @@ public void close() { } private void handleFullyFailedRequest( - Throwable err, List requestEntries, Consumer> requestResult) { + Throwable err, List requestEntries, ResultHandler resultHandler) { numRecordsOutErrorsCounter.inc(requestEntries.size()); - boolean isFatal = FIREHOSE_EXCEPTION_HANDLER.consumeIfFatal(err, getFatalExceptionCons()); + boolean isFatal = + FIREHOSE_EXCEPTION_HANDLER.consumeIfFatal( + err, resultHandler::completeExceptionally); if (isFatal) { return; } if (failOnError) { - getFatalExceptionCons() - .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException(err)); + resultHandler.completeExceptionally( + new KinesisFirehoseException.KinesisFirehoseFailFastException(err)); return; } @@ -223,17 +225,17 @@ private void handleFullyFailedRequest( "KDF Sink failed to write and will retry {} entries to KDF", requestEntries.size(), err); - requestResult.accept(requestEntries); + resultHandler.retryForEntries(requestEntries); } private void handlePartiallyFailedRequest( PutRecordBatchResponse response, List requestEntries, - Consumer> requestResult) { + ResultHandler resultHandler) { numRecordsOutErrorsCounter.inc(response.failedPutCount()); if (failOnError) { - getFatalExceptionCons() - .accept(new KinesisFirehoseException.KinesisFirehoseFailFastException()); + resultHandler.completeExceptionally( + new KinesisFirehoseException.KinesisFirehoseFailFastException()); return; } @@ -248,6 +250,6 @@ private void handlePartiallyFailedRequest( } } - requestResult.accept(failedRequestEntries); + resultHandler.retryForEntries(failedRequestEntries); } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java index 684d018aa..aa900d517 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.ResultHandler; import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy; import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy; @@ -48,7 +49,6 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; @@ -199,7 +199,7 @@ private static RateLimitingStrategy buildRateLimitingStrategy( @Override protected void submitRequestEntries( List requestEntries, - Consumer> requestResult) { + ResultHandler resultHandler) { PutRecordsRequest batchRequest = PutRecordsRequest.builder() @@ -213,11 +213,11 @@ protected void submitRequestEntries( future.whenComplete( (response, err) -> { if (err != null) { - handleFullyFailedRequest(err, requestEntries, requestResult); + handleFullyFailedRequest(err, requestEntries, resultHandler); } else if (response.failedRecordCount() > 0) { - handlePartiallyFailedRequest(response, requestEntries, requestResult); + handlePartiallyFailedRequest(response, requestEntries, resultHandler); } else { - requestResult.accept(Collections.emptyList()); + resultHandler.complete(); } }); } @@ -230,15 +230,15 @@ protected long getSizeInBytes(PutRecordsRequestEntry requestEntry) { private void handleFullyFailedRequest( Throwable err, List requestEntries, - Consumer> requestResult) { + ResultHandler resultHandler) { LOG.warn( "KDS Sink failed to write and will retry {} entries to KDS", requestEntries.size(), err); numRecordsOutErrorsCounter.inc(requestEntries.size()); - if (isRetryable(err)) { - requestResult.accept(requestEntries); + if (isRetryable(err, resultHandler)) { + resultHandler.retryForEntries(requestEntries); } } @@ -250,15 +250,15 @@ public void close() { private void handlePartiallyFailedRequest( PutRecordsResponse response, List requestEntries, - Consumer> requestResult) { + ResultHandler resultHandler) { LOG.warn( "KDS Sink failed to write and will retry {} entries to KDS", response.failedRecordCount()); numRecordsOutErrorsCounter.inc(response.failedRecordCount()); if (failOnError) { - getFatalExceptionCons() - .accept(new KinesisStreamsException.KinesisStreamsFailFastException()); + resultHandler.completeExceptionally( + new KinesisStreamsException.KinesisStreamsFailFastException()); return; } List failedRequestEntries = @@ -271,17 +271,19 @@ private void handlePartiallyFailedRequest( } } - requestResult.accept(failedRequestEntries); + resultHandler.retryForEntries(failedRequestEntries); } - private boolean isRetryable(Throwable err) { + private boolean isRetryable( + Throwable err, ResultHandler resultHandler) { - if (!KINESIS_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) { + if (!KINESIS_FATAL_EXCEPTION_CLASSIFIER.isFatal( + err, resultHandler::completeExceptionally)) { return false; } if (failOnError) { - getFatalExceptionCons() - .accept(new KinesisStreamsException.KinesisStreamsFailFastException(err)); + resultHandler.completeExceptionally( + new KinesisStreamsException.KinesisStreamsFailFastException(err)); return false; } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java index e0ef4c4c3..7a4e229ff 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.ResultHandler; import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider; import org.apache.flink.connector.dynamodb.util.PrimaryKeyBuilder; import org.apache.flink.metrics.Counter; @@ -45,12 +46,10 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; import static java.util.Collections.singletonMap; import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; @@ -160,7 +159,7 @@ public DynamoDbSinkWriter( @Override protected void submitRequestEntries( List requestEntries, - Consumer> requestResultConsumer) { + ResultHandler resultHandler) { List items = new ArrayList<>(); @@ -190,17 +189,17 @@ protected void submitRequestEntries( future.whenComplete( (response, err) -> { if (err != null) { - handleFullyFailedRequest(err, requestEntries, requestResultConsumer); + handleFullyFailedRequest(err, requestEntries, resultHandler); } else if (!CollectionUtil.isNullOrEmpty(response.unprocessedItems())) { - handlePartiallyUnprocessedRequest(response, requestResultConsumer); + handlePartiallyUnprocessedRequest(response, resultHandler); } else { - requestResultConsumer.accept(Collections.emptyList()); + resultHandler.complete(); } }); } private void handlePartiallyUnprocessedRequest( - BatchWriteItemResponse response, Consumer> requestResult) { + BatchWriteItemResponse response, ResultHandler resultHandler) { List unprocessed = new ArrayList<>(); for (WriteRequest writeRequest : response.unprocessedItems().get(tableName)) { @@ -211,32 +210,33 @@ private void handlePartiallyUnprocessedRequest( numRecordsSendErrorsCounter.inc(unprocessed.size()); numRecordsSendPartialFailure.inc(unprocessed.size()); - requestResult.accept(unprocessed); + resultHandler.retryForEntries(unprocessed); } private void handleFullyFailedRequest( Throwable err, List requestEntries, - Consumer> requestResult) { + ResultHandler resultHandler) { LOG.warn( "DynamoDB Sink failed to persist and will retry {} entries.", requestEntries.size(), err); numRecordsSendErrorsCounter.inc(requestEntries.size()); - if (isRetryable(err.getCause())) { - requestResult.accept(requestEntries); + if (isRetryable(err.getCause(), resultHandler)) { + resultHandler.retryForEntries(requestEntries); } } - private boolean isRetryable(Throwable err) { + private boolean isRetryable(Throwable err, ResultHandler resultHandler) { // isFatal() is really isNotFatal() - if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) { + if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal( + err, resultHandler::completeExceptionally)) { return false; } if (failOnError) { - getFatalExceptionCons() - .accept(new DynamoDbSinkException.DynamoDbSinkFailFastException(err)); + resultHandler.completeExceptionally( + new DynamoDbSinkException.DynamoDbSinkFailFastException(err)); return false; } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java index f0d139fb3..44adf5cfa 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.dynamodb.sink; +import org.apache.flink.connector.base.sink.writer.ResultHandler; import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider; @@ -48,8 +49,6 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; @@ -64,7 +63,6 @@ public class DynamoDbSinkWriterTest { private static final String PARTITION_KEY = "partition_key"; private static final String SORT_KEY = "sort_key"; private static final String TABLE_NAME = "table_name"; - private static final long FUTURE_TIMEOUT_MS = 1000; @Test public void testSuccessfulRequestWithNoDeduplication() throws Exception { @@ -86,14 +84,13 @@ public void testSuccessfulRequestWithNoDeduplication() throws Exception { DynamoDbSinkWriter> dynamoDbSinkWriter = getDefaultSinkWriter( true, overwriteByPartitionKeys, () -> trackingDynamoDbAsyncClient); - CompletableFuture> failedRequests = new CompletableFuture<>(); - Consumer> failedRequestConsumer = failedRequests::complete; + TestingResultHandler resultHandler = new TestingResultHandler(); - dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer); + dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler); assertThat(trackingDynamoDbAsyncClient.getRequestHistory()) .isNotEmpty() .containsExactly(expectedClientRequests); - assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)).isEmpty(); + assertThat(resultHandler.isComplete()).isTrue(); } @Test @@ -108,14 +105,13 @@ public void testPutRequestPartitionKeyDeduplication() throws Exception { DynamoDbSinkWriter> dynamoDbSinkWriter = getDefaultSinkWriter( true, overwriteByPartitionKeys, () -> trackingDynamoDbAsyncClient); - CompletableFuture> failedRequests = new CompletableFuture<>(); - Consumer> failedRequestConsumer = failedRequests::complete; - dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer); + TestingResultHandler resultHandler = new TestingResultHandler(); + dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler); assertThat(trackingDynamoDbAsyncClient.getRequestHistory()) .isNotEmpty() .containsExactly(expectedClientRequests); - assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)).isEmpty(); + assertThat(resultHandler.isComplete()).isTrue(); } @Test @@ -131,14 +127,13 @@ public void testDeleteRequestPartitionKeyDeduplication() throws Exception { DynamoDbSinkWriter> dynamoDbSinkWriter = getDefaultSinkWriter( true, overwriteByPartitionKeys, () -> trackingDynamoDbAsyncClient); - CompletableFuture> failedRequests = new CompletableFuture<>(); - Consumer> failedRequestConsumer = failedRequests::complete; + TestingResultHandler resultHandler = new TestingResultHandler(); - dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer); + dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler); assertThat(trackingDynamoDbAsyncClient.getRequestHistory()) .isNotEmpty() .containsExactly(expectedClientRequests); - assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)).isEmpty(); + assertThat(resultHandler.isComplete()).isTrue(); } @Test @@ -159,10 +154,9 @@ public void testMultipleKeyDeduplication() throws Exception { DynamoDbSinkWriter> dynamoDbSinkWriter = getDefaultSinkWriter( true, overwriteByPartitionKeys, () -> trackingDynamoDbAsyncClient); - CompletableFuture> failedRequests = new CompletableFuture<>(); - Consumer> failedRequestConsumer = failedRequests::complete; + TestingResultHandler resultHandler = new TestingResultHandler(); - dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer); + dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler); // Order does not matter in a batch write request assertThat(trackingDynamoDbAsyncClient.getRequestHistory()) .isNotEmpty() @@ -171,7 +165,7 @@ public void testMultipleKeyDeduplication() throws Exception { assertThat(clientBatchRequest) .containsExactlyInAnyOrderElementsOf( expectedClientRequests)); - assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)).isEmpty(); + assertThat(resultHandler.isComplete()).isTrue(); } @Test @@ -193,12 +187,12 @@ public void testRetryableExceptionWhenFailOnErrorOffWillRetry() throws Exception DynamoDbSinkWriter> dynamoDbSinkWriter = getDefaultSinkWriter( failOnError, Collections.emptyList(), () -> throwingDynamoDbAsyncClient); - CompletableFuture> failedRequests = new CompletableFuture<>(); - Consumer> failedRequestConsumer = failedRequests::complete; + TestingResultHandler resultHandler = new TestingResultHandler(); - dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer); + dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler); - assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)) + assertThat(resultHandler.isComplete()).isFalse(); + assertThat(resultHandler.getRetryEntries()) .containsExactlyInAnyOrderElementsOf(inputRequests); } @@ -224,12 +218,12 @@ public void testPartiallyFailedRequestRetriesFailedRecords() throws Exception { failOnError, Collections.emptyList(), () -> failingRecordsDynamoDbAsyncClient); - CompletableFuture> failedRequests = new CompletableFuture<>(); - Consumer> failedRequestConsumer = failedRequests::complete; + TestingResultHandler resultHandler = new TestingResultHandler(); - dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer); + dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler); - assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)) + assertThat(resultHandler.isComplete()).isFalse(); + assertThat(resultHandler.getRetryEntries()) .usingRecursiveComparison() .isEqualTo(expectedRetriedRecords); } @@ -334,11 +328,19 @@ private void assertThatRequestsAreNotRetried( DynamoDbSinkWriter> dynamoDbSinkWriter = getDefaultSinkWriter( failOnError, Collections.emptyList(), () -> throwingDynamoDbAsyncClient); - CompletableFuture> failedRequests = new CompletableFuture<>(); - Consumer> failedRequestConsumer = failedRequests::complete; + TestingResultHandler resultHandler = new TestingResultHandler(); - dynamoDbSinkWriter.submitRequestEntries(getDefaultInputRequests(), failedRequestConsumer); - assertThat(failedRequests).isNotCompleted(); + dynamoDbSinkWriter.submitRequestEntries(getDefaultInputRequests(), resultHandler); + assertThat(resultHandler.isComplete()).isFalse(); + assertThat(resultHandler.getRetryEntries()).isEmpty(); + // assert exceptionToThrow is thrown or wrapped + assertThat(resultHandler.getException()) + .satisfies( + e -> + assertThat( + (e == exceptionToThrow.get() + || e.getCause() == exceptionToThrow.get())) + .isTrue()); } private DynamoDbSinkWriter> getDefaultSinkWriter( @@ -442,6 +444,41 @@ private Map itemWithPayload( return item; } + private static class TestingResultHandler implements ResultHandler { + + private boolean isComplete = false; + private Exception exception; + + private List retryEntries = new ArrayList<>(); + + @Override + public void complete() { + isComplete = true; + } + + @Override + public void completeExceptionally(Exception e) { + exception = e; + } + + @Override + public void retryForEntries(List list) { + retryEntries.addAll(list); + } + + public boolean isComplete() { + return isComplete; + } + + public Exception getException() { + return exception; + } + + public List getRetryEntries() { + return retryEntries; + } + } + private static class TestAsyncDynamoDbClientProvider implements SdkClientProvider { diff --git a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java index 459e06807..dfb845e58 100644 --- a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java +++ b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.ResultHandler; import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; import org.apache.flink.connector.sqs.sink.client.SdkClientProvider; import org.apache.flink.metrics.Counter; @@ -41,11 +42,9 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; @@ -123,7 +122,7 @@ class SqsSinkWriter extends AsyncSinkWriter requestEntries, - Consumer> requestResult) { + ResultHandler resultHandler) { final SendMessageBatchRequest batchRequest = SendMessageBatchRequest.builder().entries(requestEntries).queueUrl(sqsUrl).build(); @@ -134,20 +133,19 @@ protected void submitRequestEntries( future.whenComplete( (response, err) -> { if (err != null) { - handleFullyFailedRequest(err, requestEntries, requestResult); - } else if (response.failed() != null && response.failed().size() > 0) { + handleFullyFailedRequest(err, requestEntries, resultHandler); + } else if (response.failed() != null && !response.failed().isEmpty()) { handlePartiallyFailedRequest( - response, requestEntries, requestResult); + response, requestEntries, resultHandler); } else { - requestResult.accept(Collections.emptyList()); + resultHandler.complete(); } }) .exceptionally( ex -> { - getFatalExceptionCons() - .accept( - new SqsSinkException.SqsFailFastSinkException( - ex.getMessage(), ex)); + resultHandler.completeExceptionally( + new SqsSinkException.SqsFailFastSinkException( + ex.getMessage(), ex)); return null; }); } @@ -165,16 +163,17 @@ public void close() { private void handleFullyFailedRequest( Throwable err, List requestEntries, - Consumer> requestResult) { + ResultHandler resultHandler) { numRecordsOutErrorsCounter.inc(requestEntries.size()); - boolean isFatal = SQS_EXCEPTION_HANDLER.consumeIfFatal(err, getFatalExceptionCons()); + boolean isFatal = + SQS_EXCEPTION_HANDLER.consumeIfFatal(err, resultHandler::completeExceptionally); if (isFatal) { return; } if (failOnError) { - getFatalExceptionCons().accept(new SqsSinkException.SqsFailFastSinkException(err)); + resultHandler.completeExceptionally(new SqsSinkException.SqsFailFastSinkException(err)); return; } @@ -183,13 +182,13 @@ private void handleFullyFailedRequest( requestEntries.size(), requestEntries.get(0).toString(), err); - requestResult.accept(requestEntries); + resultHandler.retryForEntries(requestEntries); } private void handlePartiallyFailedRequest( SendMessageBatchResponse response, List requestEntries, - Consumer> requestResult) { + ResultHandler resultHandler) { LOG.warn( "handlePartiallyFailedRequest: SQS Sink failed to write and will retry {} entries to SQS", @@ -197,7 +196,7 @@ private void handlePartiallyFailedRequest( numRecordsOutErrorsCounter.inc(response.failed().size()); if (failOnError) { - getFatalExceptionCons().accept(new SqsSinkException.SqsFailFastSinkException()); + resultHandler.completeExceptionally(new SqsSinkException.SqsFailFastSinkException()); return; } @@ -212,15 +211,14 @@ private void handlePartiallyFailedRequest( } else { LOG.error( "handlePartiallyFailedRequest: SQS Sink failed to retry unsuccessful SQS publish request due to invalid failed requestId"); - getFatalExceptionCons() - .accept( - new SqsSinkException.SqsFailFastSinkException( - "SQS Sink failed to retry unsuccessful SQS publish request due to invalid failed requestId")); + resultHandler.completeExceptionally( + new SqsSinkException.SqsFailFastSinkException( + "SQS Sink failed to retry unsuccessful SQS publish request due to invalid failed requestId")); return; } } - requestResult.accept(failedRequestEntries); + resultHandler.retryForEntries(failedRequestEntries); } private Optional getFailedRecord( diff --git a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java index 38c25f911..c8d7073e2 100644 --- a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java +++ b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java @@ -21,10 +21,10 @@ import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.ResultHandler; import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; import org.apache.flink.connector.sqs.sink.client.SdkClientProvider; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.core.exception.SdkClientException; @@ -48,7 +48,6 @@ import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.function.Consumer; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -71,12 +70,10 @@ public void testNonRetryableExceptionWhenFailOnErrorFalseWillNotRetry() throws I TestSqsAsyncClientProvider testSqsAsyncClientProvider = new TestSqsAsyncClientProvider(sqsAsyncClient); sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider); - CompletableFuture> failedRequests = - new CompletableFuture<>(); - Consumer> failedRequestConsumer = - failedRequests::complete; - sinkWriter.submitRequestEntries(getDefaultInputRequests(), failedRequestConsumer); - assertThat(failedRequests).isNotCompleted(); + TestingResultHandler resultHandler = new TestingResultHandler(); + sinkWriter.submitRequestEntries(getDefaultInputRequests(), resultHandler); + assertThat(resultHandler.isComplete()).isFalse(); + assertThat(resultHandler.getExceptionThrown()).isNotNull(); } @Test @@ -87,12 +84,10 @@ public void testRetryableExceptionWhenFailOnErrorTrueWillNotRetry() throws IOExc new TestSqsAsyncClientProvider(sqsAsyncClient); sinkWriter = getSqsSinkWriter(true, testSqsAsyncClientProvider); - CompletableFuture> failedRequests = - new CompletableFuture<>(); - Consumer> failedRequestConsumer = - failedRequests::complete; - sinkWriter.submitRequestEntries(getDefaultInputRequests(), failedRequestConsumer); - assertThat(failedRequests).isNotCompleted(); + TestingResultHandler resultHandler = new TestingResultHandler(); + sinkWriter.submitRequestEntries(getDefaultInputRequests(), resultHandler); + assertThat(resultHandler.isComplete()).isFalse(); + assertThat(resultHandler.getExceptionThrown()).isNotNull(); } @Test @@ -103,12 +98,10 @@ public void testRetryableExceptionWhenFailOnErrorFalseWillRetry() throws IOExcep new TestSqsAsyncClientProvider(sqsAsyncClient); sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider); - CompletableFuture> failedRequests = - new CompletableFuture<>(); - Consumer> failedRequestConsumer = - failedRequests::complete; - sinkWriter.submitRequestEntries(getDefaultInputRequests(), failedRequestConsumer); - assertThat(failedRequests).isCompleted(); + TestingResultHandler resultHandler = new TestingResultHandler(); + sinkWriter.submitRequestEntries(getDefaultInputRequests(), resultHandler); + assertThat(resultHandler.isComplete()).isFalse(); + assertThat(resultHandler.getFailedRequests()).hasSize(2); } @Test @@ -118,13 +111,12 @@ public void testSubmitRequestEntriesWithNoException() throws IOException { new TestSqsAsyncClientProvider(sqsAsyncClient); sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider); - CompletableFuture> requests = new CompletableFuture<>(); - Consumer> requestResult = requests::complete; + TestingResultHandler resultHandler = new TestingResultHandler(); - sinkWriter.submitRequestEntries(getDefaultInputRequests(), requestResult); + sinkWriter.submitRequestEntries(getDefaultInputRequests(), resultHandler); - List result = requests.join(); - Assertions.assertTrue(result.isEmpty()); + assertThat(resultHandler.isComplete()).isTrue(); + assertThat(resultHandler.getFailedRequests()).isEmpty(); } @Test @@ -135,13 +127,10 @@ public void testSubmitRequestEntriesWithPartialSuccessWithFailOnErrorFalseWillRe new TestSqsAsyncClientProvider(sqsAsyncClient); sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider); - CompletableFuture> requests = new CompletableFuture<>(); - Consumer> requestResult = requests::complete; + TestingResultHandler resultHandler = new TestingResultHandler(); + sinkWriter.submitRequestEntries(getDefaultInputRequests(), resultHandler); - sinkWriter.submitRequestEntries(getDefaultInputRequests(), requestResult); - - List result = requests.join(); - Assertions.assertEquals(1, result.size()); + assertThat(resultHandler.getFailedRequests()).hasSize(1); } @Test @@ -153,11 +142,11 @@ public void testSubmitRequestEntriesWithPartialSuccessWithFailOnErrorFalseWillRe new TestSqsAsyncClientProvider(sqsAsyncClient); sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider); - CompletableFuture> requests = new CompletableFuture<>(); - Consumer> requestResult = requests::complete; + TestingResultHandler resultHandler = new TestingResultHandler(); - sinkWriter.submitRequestEntries(getDefaultInputRequests(), requestResult); - assertThat(requests).isNotCompleted(); + sinkWriter.submitRequestEntries(getDefaultInputRequests(), resultHandler); + assertThat(resultHandler.isComplete()).isFalse(); + assertThat(resultHandler.getExceptionThrown()).isNotNull(); } @Test @@ -169,11 +158,11 @@ public void testSubmitRequestEntriesWithPartialSuccessWithFailOnErrorTrueWillNot new TestSqsAsyncClientProvider(sqsAsyncClient); sinkWriter = getSqsSinkWriter(true, testSqsAsyncClientProvider); - CompletableFuture> requests = new CompletableFuture<>(); - Consumer> requestResult = requests::complete; + TestingResultHandler resultHandler = new TestingResultHandler(); - sinkWriter.submitRequestEntries(getDefaultInputRequests(), requestResult); - assertThat(requests).isNotCompleted(); + sinkWriter.submitRequestEntries(getDefaultInputRequests(), resultHandler); + assertThat(resultHandler.isComplete()).isFalse(); + assertThat(resultHandler.getExceptionThrown()).isNotNull(); } @Test @@ -283,6 +272,40 @@ private Optional getGenericNonRetryableException() { .build()); } + private static class TestingResultHandler + implements ResultHandler { + private boolean isComplete = false; + private Exception exceptionThrown = null; + private final List failedRequests = new ArrayList<>(); + + @Override + public void complete() { + isComplete = true; + } + + @Override + public void completeExceptionally(Exception e) { + exceptionThrown = e; + } + + @Override + public void retryForEntries(List list) { + failedRequests.addAll(list); + } + + public boolean isComplete() { + return isComplete; + } + + public Exception getExceptionThrown() { + return exceptionThrown; + } + + public List getFailedRequests() { + return failedRequests; + } + } + private static class ThrowingSqsAsyncClient implements SqsAsyncClient { private final Optional errorToReturn; diff --git a/pom.xml b/pom.xml index d52f695ed..12cd37655 100644 --- a/pom.xml +++ b/pom.xml @@ -56,7 +56,7 @@ under the License. 1.12.439 2.26.19 4.1.86.Final - 1.19.0 + 1.20.0 2.14.3 1.1.18 32.1.3-jre