diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java index a3eb8df4..1163bf26 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java @@ -62,6 +62,13 @@ public class Elasticsearch8AsyncWriter extends AsyncSinkWriter requestEntries, Consumer> requestResult) { + numRequestSubmittedCounter.inc(); LOG.debug("submitRequestEntries with {} items", requestEntries.size()); BulkRequest.Builder br = new BulkRequest.Builder(); @@ -133,7 +144,11 @@ private void handleFailedRequest( List requestEntries, Consumer> requestResult, Throwable error) { - LOG.debug("The BulkRequest of {} operation(s) has failed.", requestEntries.size()); + LOG.warn( + "The BulkRequest of {} operation(s) has failed due to: {}", + requestEntries.size(), + error.getMessage()); + LOG.debug("The BulkRequest has failed", error); numRecordsOutErrorsCounter.inc(requestEntries.size()); if (isRetryable(error.getCause())) { @@ -145,15 +160,17 @@ private void handlePartiallyFailedRequest( List requestEntries, Consumer> requestResult, BulkResponse response) { + LOG.debug("The BulkRequest has failed partially. Response: {}", response); ArrayList failedItems = new ArrayList<>(); for (int i = 0; i < response.items().size(); i++) { if (response.items().get(i).error() != null) { - numRecordsOutErrorsCounter.inc(); failedItems.add(requestEntries.get(i)); } } - LOG.debug( + numRecordsOutErrorsCounter.inc(failedItems.size()); + numRecordsSendPartialFailureCounter.inc(failedItems.size()); + LOG.info( "The BulkRequest with {} operation(s) has {} failure(s). It took {}ms", requestEntries.size(), failedItems.size(),