Skip to content

Commit

Permalink
[FLINK-35504] Improve Elasticsearch 8 connector observability
Browse files Browse the repository at this point in the history
  • Loading branch information
liuml07 authored and reswqa committed Jun 6, 2024
1 parent 50327f8 commit da2ef1f
Showing 1 changed file with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, O
private boolean close = false;

private final Counter numRecordsOutErrorsCounter;
/**
* A counter to track number of records that are returned by Elasticsearch as failed and then
* retried by this writer.
*/
private final Counter numRecordsSendPartialFailureCounter;
/** A counter to track the number of bulk requests that are sent to Elasticsearch. */
private final Counter numRequestSubmittedCounter;

private static final FatalExceptionClassifier ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER =
FatalExceptionClassifier.createChain(
Expand Down Expand Up @@ -103,11 +110,15 @@ public Elasticsearch8AsyncWriter(
checkNotNull(metricGroup);

this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
this.numRecordsSendPartialFailureCounter =
metricGroup.counter("numRecordsSendPartialFailure");
this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted");
}

@Override
protected void submitRequestEntries(
List<Operation> requestEntries, Consumer<List<Operation>> requestResult) {
numRequestSubmittedCounter.inc();
LOG.debug("submitRequestEntries with {} items", requestEntries.size());

BulkRequest.Builder br = new BulkRequest.Builder();
Expand All @@ -133,7 +144,11 @@ private void handleFailedRequest(
List<Operation> requestEntries,
Consumer<List<Operation>> requestResult,
Throwable error) {
LOG.debug("The BulkRequest of {} operation(s) has failed.", requestEntries.size());
LOG.warn(
"The BulkRequest of {} operation(s) has failed due to: {}",
requestEntries.size(),
error.getMessage());
LOG.debug("The BulkRequest has failed", error);
numRecordsOutErrorsCounter.inc(requestEntries.size());

if (isRetryable(error.getCause())) {
Expand All @@ -145,15 +160,17 @@ private void handlePartiallyFailedRequest(
List<Operation> requestEntries,
Consumer<List<Operation>> requestResult,
BulkResponse response) {
LOG.debug("The BulkRequest has failed partially. Response: {}", response);
ArrayList<Operation> failedItems = new ArrayList<>();
for (int i = 0; i < response.items().size(); i++) {
if (response.items().get(i).error() != null) {
numRecordsOutErrorsCounter.inc();
failedItems.add(requestEntries.get(i));
}
}

LOG.debug(
numRecordsOutErrorsCounter.inc(failedItems.size());
numRecordsSendPartialFailureCounter.inc(failedItems.size());
LOG.info(
"The BulkRequest with {} operation(s) has {} failure(s). It took {}ms",
requestEntries.size(),
failedItems.size(),
Expand Down

0 comments on commit da2ef1f

Please sign in to comment.