From 89a3dd2ccbe3d556687a1e63bf22f7bee9656a52 Mon Sep 17 00:00:00 2001 From: sagIoTPower Date: Tue, 1 Oct 2024 10:53:58 +0200 Subject: [PATCH] add error message and status FAILED if kafka connection fails --- .../mapping/connector/kafka/KafkaClient.java | 5 +- .../connector/kafka/TopicConsumer.java | 312 ++++++++++-------- 2 files changed, 170 insertions(+), 147 deletions(-) diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/KafkaClient.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/KafkaClient.java index 14bd4e8b3..609ee3556 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/KafkaClient.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/KafkaClient.java @@ -184,7 +184,7 @@ public KafkaClient(ConfigurationRegistry configurationRegistry, // defaultPropertiesConsumer.put("sasl.mechanism", "SCRAM-SHA-256"); // defaultPropertiesConsumer.put("sasl.jaas.config", jaasCfg); // defaultPropertiesConsumer.put("linger.ms", 1); - updateConnectorStatusAndSend(ConnectorStatus.UNKNOWN, true, true); + // updateConnectorStatusAndSend(ConnectorStatus.UNKNOWN, true, true); } private String bootstrapServers; @@ -325,7 +325,8 @@ public String getConnectorName() { public void subscribe(String topic, QOS qos) throws ConnectorException { TopicConsumer kafkaConsumer = new TopicConsumer( new TopicConfig(tenant, bootstrapServers, topic, username, password, saslMechanism, groupId, - defaultPropertiesConsumer)); + defaultPropertiesConsumer), + connectorStatus); consumerList.put(topic, kafkaConsumer); TopicConsumerCallback topicConsumerCallback = new TopicConsumerCallback(dispatcher, tenant, getConnectorIdent(), topic, true); diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/TopicConsumer.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/TopicConsumer.java index 9513a08d3..1478d30d3 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/TopicConsumer.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/TopicConsumer.java @@ -23,150 +23,172 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; +import dynamic.mapping.core.ConnectorStatusEvent; +import dynamic.mapping.core.ConnectorStatus; + public class TopicConsumer { - private final TopicConfig topicConfig; - - private ConsumingThread consumingThread; // guarded by this - private boolean closed; // guarded by this - - public TopicConsumer(final TopicConfig topicConfig) { - this.topicConfig = topicConfig; - } - - public synchronized void start(final TopicConsumerListener listener) { - if (closed) { - throw new IllegalStateException("Closed"); - } - - if (consumingThread != null) { - throw new IllegalStateException("Already started"); - } - - final ConsumingThread ct = new ConsumingThread(listener); - ct.start(); - consumingThread = ct; - } - - public void stop() throws InterruptedException { - final ConsumingThread ct; - synchronized (this) { - ct = consumingThread; - - if (ct == null) { - return; - } - - consumingThread = null; - } - - ct.close(); - - if (Thread.currentThread() != ct) { - ct.join(); - } - } - - public boolean shouldStop() { - if (consumingThread == null) - return true; - return consumingThread.shouldStop; - } - - public void close() throws InterruptedException { - synchronized (this) { - if (closed) { - return; - } - closed = true; - } - - stop(); - } - - private class ConsumingThread extends Thread { - private final TopicConsumerListener listener; - private volatile boolean closed; - boolean shouldStop = false; - - ConsumingThread(final TopicConsumerListener listener) { - super("Consumer#" + topicConfig.getBootstrapServers() + "/" + topicConfig.getTopic()); - this.listener = listener; - } - - @Override - public void run() { - Exception error = null; - boolean continueToListen = true; - - while (continueToListen) { - Topic tc = null; - try { - tc = new Topic(topicConfig); - - try { - listener.onStarted(); - } catch (final Exception e) { - // log ("Unexpected error while onStarted() notification", e); - } - - // we consume the events from the topic until - // this thread is interrupted by close() - tc.consumeUntilError(listener); - } catch (final Exception e) { - if (closed) { - break; - } - error = e; - if (error instanceof TopicAuthorizationException) { - continueToListen = false; - shouldStop = true; - } - } finally { - if (tc != null) { - try { - tc.close(); - } catch (final Exception ignore) { - } - } - } - - try { - listener.onStoppedByErrorAndReconnecting(error); - } catch (final Exception e) { - // log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", - // e) - } - - try { - Thread.sleep(5000); // TODO: make the timeout configurable and use backoff with jitter - } catch (final InterruptedException e) { - break; // interrupted by close() - // we don't restore the flag interrupted, since we still need - // to do some additional work like - // to notify listener.onStopped() - } - } - - try { - listener.onStopped(); - } catch (final Exception e) { - // log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", - // e); - } - } - - void close() { - if (closed) { // no atomicity/membars required - return; // since can be called only by one single thread - } - closed = true; - - // We stop the consuming with org.apache.kafka.common.errors.InterruptException - // In here it isn't convenient to call Topic.close() directly to initiate - // org.apache.kafka.common.errors.WakeupException, since we recreate - // the instance of Topic and it takes additional efforts to share the - // changeable reference to a Topic to close it from other thread. - interrupt(); - } - } + private final TopicConfig topicConfig; + private final ConnectorStatusEvent connectorStatus; + + private ConsumingThread consumingThread; // guarded by this + private boolean closed; // guarded by this + + public TopicConsumer(final TopicConfig topicConfig, ConnectorStatusEvent connectorStatus) { + this.topicConfig = topicConfig; + this.connectorStatus = connectorStatus; + } + + public synchronized void start(final TopicConsumerListener listener) { + if (closed) { + throw new IllegalStateException("Closed"); + } + + if (consumingThread != null) { + throw new IllegalStateException("Already started"); + } + + final ConsumingThread ct = new ConsumingThread(listener, connectorStatus); + ct.start(); + consumingThread = ct; + } + + public void stop() throws InterruptedException { + final ConsumingThread ct; + synchronized (this) { + ct = consumingThread; + + if (ct == null) { + return; + } + + consumingThread = null; + } + + ct.close(); + + if (Thread.currentThread() != ct) { + ct.join(); + } + } + + public boolean shouldStop() { + if (consumingThread == null) + return true; + return consumingThread.shouldStop; + } + + public void close() throws InterruptedException { + synchronized (this) { + if (closed) { + return; + } + closed = true; + } + + stop(); + } + + private class ConsumingThread extends Thread { + private static final int WAIT_MS_RECONNECT = 30000; + private final TopicConsumerListener listener; + private final ConnectorStatusEvent connectorStatus; + private volatile boolean closed; + boolean shouldStop = false; + + ConsumingThread(final TopicConsumerListener listener, ConnectorStatusEvent connectorStatus) { + super("Consumer#" + topicConfig.getBootstrapServers() + "/" + topicConfig.getTopic()); + this.listener = listener; + this.connectorStatus = connectorStatus; + + } + + @Override + public void run() { + Exception error = null; + boolean continueToListen = true; + + while (continueToListen) { + Topic tc = null; + try { + tc = new Topic(topicConfig); + + try { + listener.onStarted(); + } catch (final Exception e) { + // log ("Unexpected error while onStarted() notification", e); + } + + // we consume the events from the topic until + // this thread is interrupted by close() + connectorStatus.updateStatus(ConnectorStatus.CONNECTED, true); + tc.consumeUntilError(listener); + } catch (final Exception e) { + if (closed) { + break; + } + error = e; + if (error instanceof TopicAuthorizationException) { + continueToListen = false; + shouldStop = true; + } + } finally { + if (tc != null) { + try { + tc.close(); + } catch (final Exception ignore) { + } + } + } + + try { + listener.onStoppedByErrorAndReconnecting(error); + updateConnectorStatusToFailed(error); + + } catch (final Exception e) { + // log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", + // e) + } + + try { + Thread.sleep(WAIT_MS_RECONNECT); // TODO: make the timeout configurable and use backoff with jitter + } catch (final InterruptedException e) { + break; // interrupted by close() + // we don't restore the flag interrupted, since we still need + // to do some additional work like + // to notify listener.onStopped() + } + } + + try { + listener.onStopped(); + } catch (final Exception e) { + // log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", + // e); + } + } + + protected void updateConnectorStatusToFailed(Exception e) { + String msg = " --- " + e.getClass().getName() + ": " + + e.getMessage(); + if (!(e.getCause() == null)) { + msg = msg + " --- Caused by " + e.getCause().getClass().getName() + ": " + e.getCause().getMessage(); + } + connectorStatus.setMessage(msg); + connectorStatus.updateStatus(ConnectorStatus.FAILED, false); + } + + void close() { + if (closed) { // no atomicity/membars required + return; // since can be called only by one single thread + } + closed = true; + + // We stop the consuming with org.apache.kafka.common.errors.InterruptException + // In here it isn't convenient to call Topic.close() directly to initiate + // org.apache.kafka.common.errors.WakeupException, since we recreate + // the instance of Topic and it takes additional efforts to share the + // changeable reference to a Topic to close it from other thread. + interrupt(); + } + } } \ No newline at end of file