Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
switschel committed Oct 2, 2024
2 parents f4f6d30 + 4ff7c11 commit 80b40e1
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public KafkaClient(ConfigurationRegistry configurationRegistry,
// defaultPropertiesConsumer.put("sasl.mechanism", "SCRAM-SHA-256");
// defaultPropertiesConsumer.put("sasl.jaas.config", jaasCfg);
// defaultPropertiesConsumer.put("linger.ms", 1);
updateConnectorStatusAndSend(ConnectorStatus.UNKNOWN, true, true);
// updateConnectorStatusAndSend(ConnectorStatus.UNKNOWN, true, true);
}

private String bootstrapServers;
Expand Down Expand Up @@ -325,7 +325,8 @@ public String getConnectorName() {
public void subscribe(String topic, QOS qos) throws ConnectorException {
TopicConsumer kafkaConsumer = new TopicConsumer(
new TopicConfig(tenant, bootstrapServers, topic, username, password, saslMechanism, groupId,
defaultPropertiesConsumer));
defaultPropertiesConsumer),
connectorStatus);
consumerList.put(topic, kafkaConsumer);
TopicConsumerCallback topicConsumerCallback = new TopicConsumerCallback(dispatcher, tenant, getConnectorIdent(),
topic, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,150 +23,171 @@

import org.apache.kafka.common.errors.TopicAuthorizationException;

import dynamic.mapping.core.ConnectorStatusEvent;
import dynamic.mapping.core.ConnectorStatus;

public class TopicConsumer {
private final TopicConfig topicConfig;

private ConsumingThread consumingThread; // guarded by this
private boolean closed; // guarded by this

public TopicConsumer(final TopicConfig topicConfig) {
this.topicConfig = topicConfig;
}

public synchronized void start(final TopicConsumerListener listener) {
if (closed) {
throw new IllegalStateException("Closed");
}

if (consumingThread != null) {
throw new IllegalStateException("Already started");
}

final ConsumingThread ct = new ConsumingThread(listener);
ct.start();
consumingThread = ct;
}

public void stop() throws InterruptedException {
final ConsumingThread ct;
synchronized (this) {
ct = consumingThread;

if (ct == null) {
return;
}

consumingThread = null;
}

ct.close();

if (Thread.currentThread() != ct) {
ct.join();
}
}

public boolean shouldStop() {
if (consumingThread == null)
return true;
return consumingThread.shouldStop;
}

public void close() throws InterruptedException {
synchronized (this) {
if (closed) {
return;
}
closed = true;
}

stop();
}

private class ConsumingThread extends Thread {
private final TopicConsumerListener listener;
private volatile boolean closed;
boolean shouldStop = false;

ConsumingThread(final TopicConsumerListener listener) {
super("Consumer#" + topicConfig.getBootstrapServers() + "/" + topicConfig.getTopic());
this.listener = listener;
}

@Override
public void run() {
Exception error = null;
boolean continueToListen = true;

while (continueToListen) {
Topic tc = null;
try {
tc = new Topic(topicConfig);

try {
listener.onStarted();
} catch (final Exception e) {
// log ("Unexpected error while onStarted() notification", e);
}

// we consume the events from the topic until
// this thread is interrupted by close()
tc.consumeUntilError(listener);
} catch (final Exception e) {
if (closed) {
break;
}
error = e;
if (error instanceof TopicAuthorizationException) {
continueToListen = false;
shouldStop = true;
}
} finally {
if (tc != null) {
try {
tc.close();
} catch (final Exception ignore) {
}
}
}

try {
listener.onStoppedByErrorAndReconnecting(error);
} catch (final Exception e) {
// log ("Unexpected error while onStoppedByErrorAndReconnecting() notification",
// e)
}

try {
Thread.sleep(5000); // TODO: make the timeout configurable and use backoff with jitter
} catch (final InterruptedException e) {
break; // interrupted by close()
// we don't restore the flag interrupted, since we still need
// to do some additional work like
// to notify listener.onStopped()
}
}

try {
listener.onStopped();
} catch (final Exception e) {
// log ("Unexpected error while onStoppedByErrorAndReconnecting() notification",
// e);
}
}

void close() {
if (closed) { // no atomicity/membars required
return; // since can be called only by one single thread
}
closed = true;

// We stop the consuming with org.apache.kafka.common.errors.InterruptException
// In here it isn't convenient to call Topic.close() directly to initiate
// org.apache.kafka.common.errors.WakeupException, since we recreate
// the instance of Topic and it takes additional efforts to share the
// changeable reference to a Topic to close it from other thread.
interrupt();
}
}
private final TopicConfig topicConfig;
private final ConnectorStatusEvent connectorStatus;

private ConsumingThread consumingThread; // guarded by this
private boolean closed; // guarded by this

public TopicConsumer(final TopicConfig topicConfig, ConnectorStatusEvent connectorStatus) {
this.topicConfig = topicConfig;
this.connectorStatus = connectorStatus;
}

public synchronized void start(final TopicConsumerListener listener) {
if (closed) {
throw new IllegalStateException("Closed");
}

if (consumingThread != null) {
throw new IllegalStateException("Already started");
}

final ConsumingThread ct = new ConsumingThread(listener, connectorStatus);
ct.start();
consumingThread = ct;
}

public void stop() throws InterruptedException {
final ConsumingThread ct;
synchronized (this) {
ct = consumingThread;

if (ct == null) {
return;
}

consumingThread = null;
}

ct.close();

if (Thread.currentThread() != ct) {
ct.join();
}
}

public boolean shouldStop() {
if (consumingThread == null)
return true;
return consumingThread.shouldStop;
}

public void close() throws InterruptedException {
synchronized (this) {
if (closed) {
return;
}
closed = true;
}

stop();
}

private class ConsumingThread extends Thread {
private static final int WAIT_MS_RECONNECT = 30000;
private final TopicConsumerListener listener;
private final ConnectorStatusEvent connectorStatus;
private volatile boolean closed;
boolean shouldStop = false;

ConsumingThread(final TopicConsumerListener listener, ConnectorStatusEvent connectorStatus) {
super("Consumer#" + topicConfig.getBootstrapServers() + "/" + topicConfig.getTopic());
this.listener = listener;
this.connectorStatus = connectorStatus;

}

@Override
public void run() {
Exception error = null;
boolean continueToListen = true;

while (continueToListen) {
Topic tc = null;
try {
tc = new Topic(topicConfig);

try {
listener.onStarted();
} catch (final Exception e) {
// log ("Unexpected error while onStarted() notification", e);
}

// we consume the events from the topic until
// this thread is interrupted by close()
connectorStatus.updateStatus(ConnectorStatus.CONNECTED, true);
tc.consumeUntilError(listener);
} catch (final Exception e) {
if (closed) {
break;
}
error = e;
if (error instanceof TopicAuthorizationException) {
continueToListen = false;
shouldStop = true;
}
} finally {
if (tc != null) {
try {
tc.close();
} catch (final Exception ignore) {
}
}
}

try {
listener.onStoppedByErrorAndReconnecting(error);
updateConnectorStatusToFailed(error, topicConfig.getTopic());
} catch (final Exception e) {
// log ("Unexpected error while onStoppedByErrorAndReconnecting() notification",
// e)
}

try {
Thread.sleep(WAIT_MS_RECONNECT); // TODO: make the timeout configurable and use backoff with jitter
} catch (final InterruptedException e) {
break; // interrupted by close()
// we don't restore the flag interrupted, since we still need
// to do some additional work like
// to notify listener.onStopped()
}
}

try {
listener.onStopped();
} catch (final Exception e) {
// log ("Unexpected error while onStoppedByErrorAndReconnecting() notification",
// e);
}
}

protected void updateConnectorStatusToFailed(Exception e, String topic) {
String msg = "Topic:" + topic + " --- " + e.getClass().getName() + ": "
+ e.getMessage();
if (!(e.getCause() == null)) {
msg = msg + " --- Caused by " + e.getCause().getClass().getName() + ": " + e.getCause().getMessage();
}
connectorStatus.setMessage(msg);
connectorStatus.updateStatus(ConnectorStatus.FAILED, false);
}

void close() {
if (closed) { // no atomicity/membars required
return; // since can be called only by one single thread
}
closed = true;

// We stop the consuming with org.apache.kafka.common.errors.InterruptException
// In here it isn't convenient to call Topic.close() directly to initiate
// org.apache.kafka.common.errors.WakeupException, since we recreate
// the instance of Topic and it takes additional efforts to share the
// changeable reference to a Topic to close it from other thread.
interrupt();
}
}
}
Loading

0 comments on commit 80b40e1

Please sign in to comment.