diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/TopicConsumer.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/TopicConsumer.java index 5a35f83c..7a7ae86b 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/TopicConsumer.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/TopicConsumer.java @@ -142,6 +142,7 @@ public void run() { try { listener.onStoppedByErrorAndReconnecting(error); + updateConnectorStatusToFailed(error, topicConfig.getTopic()); } catch (final Exception e) { // log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", // e) @@ -165,6 +166,16 @@ public void run() { } } + protected void updateConnectorStatusToFailed(Exception e, String topic) { + String msg = "Topic:" + topic + " --- " + e.getClass().getName() + ": " + + e.getMessage(); + if (!(e.getCause() == null)) { + msg = msg + " --- Caused by " + e.getCause().getClass().getName() + ": " + e.getCause().getMessage(); + } + connectorStatus.setMessage(msg); + connectorStatus.updateStatus(ConnectorStatus.FAILED, false); + } + void close() { if (closed) { // no atomicity/membars required return; // since can be called only by one single thread