Skip to content

Commit

Permalink
Merge pull request #252 from SoftwareAG/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
switschel authored Sep 5, 2024
2 parents b811e05 + ae7aa9a commit 9302413
Show file tree
Hide file tree
Showing 49 changed files with 809 additions and 604 deletions.
18 changes: 0 additions & 18 deletions .github/dependabot.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import dynamic.mapping.core.MappingComponent;
import dynamic.mapping.core.ConnectorStatus;
import dynamic.mapping.processor.model.ProcessingContext;
import dynamic.mapping.rest.MappingRestController;

@Slf4j
public abstract class AConnectorClient {
Expand Down Expand Up @@ -419,7 +418,7 @@ public void updateActiveSubscriptionsOutbound(List<Mapping> updatedMappings) {
}

if (mapping.isActive() && isDeployed) {
getMappingsDeployedInbound().put(mapping.ident, mapping);
getMappingsDeployedOutbound().put(mapping.ident, mapping);
}
});
}
Expand Down Expand Up @@ -678,20 +677,20 @@ public void collectSubscribedMappingsAll(Map<String, DeploymentMapEntryDetailed>
List<String> subscribedMappingsInbound = getMappingsDeployedInbound().keySet().stream()
.collect(Collectors.toList());
// iterate over all mappings for specific client
subscribedMappingsInbound.forEach(ident -> {
DeploymentMapEntryDetailed mappingDeployed = mappingsDeployed.getOrDefault(ident,
new DeploymentMapEntryDetailed(ident));
subscribedMappingsInbound.forEach(mappingIdent -> {
DeploymentMapEntryDetailed mappingDeployed = mappingsDeployed.getOrDefault(mappingIdent,
new DeploymentMapEntryDetailed(mappingIdent));
mappingDeployed.getConnectors().add(cleanedConfiguration);
mappingsDeployed.put(ident, mappingDeployed);
mappingsDeployed.put(mappingIdent, mappingDeployed);
});
List<String> subscribedMappingsOutbound = getMappingsDeployedOutbound().keySet().stream()
.collect(Collectors.toList());
// iterate over all mappings for specific client
subscribedMappingsOutbound.forEach(ident -> {
DeploymentMapEntryDetailed mappingDeployed = mappingsDeployed.getOrDefault(ident,
new DeploymentMapEntryDetailed(ident));
subscribedMappingsOutbound.forEach(mappingIdent -> {
DeploymentMapEntryDetailed mappingDeployed = mappingsDeployed.getOrDefault(mappingIdent,
new DeploymentMapEntryDetailed(mappingIdent));
mappingDeployed.getConnectors().add(cleanedConfiguration);
mappingsDeployed.put(ident, mappingDeployed);
mappingsDeployed.put(mappingIdent, mappingDeployed);
});
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,14 @@ public boolean initialize() {

@Override
public void connect() {
updateConnectorStatusAndSend(ConnectorStatus.CONNECTING, true, true);
log.info("Tenant {} - Trying to connect to {} - phase I: (isConnected:shouldConnect) ({}:{})",
tenant, getConnectorName(), isConnected(),
shouldConnect());
if (isConnected())
disconnect();

if (shouldConnect())
updateConnectorStatusAndSend(ConnectorStatus.CONNECTING, true, shouldConnect());
String protocol = (String) connectorConfiguration.getProperties().getOrDefault("protocol", false);
boolean useSelfSignedCertificate = (Boolean) connectorConfiguration.getProperties()
.getOrDefault("useSelfSignedCertificate", false);
Expand Down Expand Up @@ -334,7 +335,7 @@ tenant, getConnectorName(), isConnected(),
updateConnectorStatusAndSend(ConnectorStatus.CONNECTED, true, true);
List<Mapping> updatedMappingsInbound = mappingComponent.rebuildMappingInboundCache(tenant);
updateActiveSubscriptionsInbound(updatedMappingsInbound, true);
List<Mapping> updatedMappingsOutbound = mappingComponent.rebuildMappingInboundCache(tenant);
List<Mapping> updatedMappingsOutbound = mappingComponent.rebuildMappingOutboundCache(tenant);
updateActiveSubscriptionsOutbound(updatedMappingsOutbound);

} catch (Exception e) {
Expand All @@ -350,6 +351,7 @@ tenant, getConnectorName(), isConnected(),
try {
// test if the mqtt connection is configured and enabled
if (shouldConnect()) {
/*
try {
// is not working for broker.emqx.io
subscribe("$SYS/#", QOS.AT_LEAST_ONCE);
Expand All @@ -358,7 +360,7 @@ tenant, getConnectorName(), isConnected(),
"Tenant {} - Error on subscribing to topic $SYS/#, this might not be supported by the mqtt broker {} {}",
e.getMessage(), e);
}

*/
mappingComponent.rebuildMappingOutboundCache(tenant);
// in order to keep MappingInboundCache and ActiveSubscriptionMappingInbound in
// sync, the ActiveSubscriptionMappingInbound is build on the
Expand All @@ -385,7 +387,7 @@ public void close() {
public boolean isConfigValid(ConnectorConfiguration configuration) {
if (configuration == null)
return false;
// if using selfsignied certificate additional properties have to be set
// if using self signed certificate additional properties have to be set
Boolean useSelfSignedCertificate = (Boolean) configuration.getProperties()
.getOrDefault("useSelfSignedCertificate", false);
if (useSelfSignedCertificate && (configuration.getProperties().get("fingerprintSelfSignedCertificate") == null
Expand Down Expand Up @@ -426,9 +428,9 @@ public void disconnect() {
}
});

if (mqttClient.getState().isConnected()) {
mqttClient.unsubscribe(Mqtt3Unsubscribe.builder().topicFilter("$SYS").build());
}
//if (mqttClient.getState().isConnected()) {
// mqttClient.unsubscribe(Mqtt3Unsubscribe.builder().topicFilter("$SYS").build());
//}

try {
if (mqttClient != null && mqttClient.getState().isConnected())
Expand All @@ -440,7 +442,7 @@ public void disconnect() {
updateConnectorStatusAndSend(ConnectorStatus.DISCONNECTED, true, true);
List<Mapping> updatedMappingsInbound = mappingComponent.rebuildMappingInboundCache(tenant);
updateActiveSubscriptionsInbound(updatedMappingsInbound, true);
List<Mapping> updatedMappingsOutbound = mappingComponent.rebuildMappingInboundCache(tenant);
List<Mapping> updatedMappingsOutbound = mappingComponent.rebuildMappingOutboundCache(tenant);
updateActiveSubscriptionsOutbound(updatedMappingsOutbound);
log.info("Tenant {} - Disconnected from MQTT broker II: {}", tenant,
mqttClient.getConfig().getServerHost());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,23 +164,24 @@ public void initialize(MicroserviceSubscriptionAddedEvent event) {
public AConnectorClient initializeConnectorByConfiguration(ConnectorConfiguration connectorConfiguration,
ServiceConfiguration serviceConfiguration, String tenant) throws ConnectorRegistryException {
AConnectorClient connectorClient = null;
try {
connectorClient = configurationRegistry.createConnectorClient(connectorConfiguration,
additionalSubscriptionIdTest, tenant);
} catch (IOException e) {
log.error("Tenant {} - Error on creating connector {} {}", connectorConfiguration.getConnectorType(), e);
throw new ConnectorRegistryException(e.getMessage());
if(connectorConfiguration.isEnabled()) {
try {
connectorClient = configurationRegistry.createConnectorClient(connectorConfiguration,
additionalSubscriptionIdTest, tenant);
} catch (IOException e) {
log.error("Tenant {} - Error on creating connector {} {}", connectorConfiguration.getConnectorType(), e);
throw new ConnectorRegistryException(e.getMessage());
}
connectorRegistry.registerClient(tenant, connectorClient);
// initialize AsynchronousDispatcherInbound
AsynchronousDispatcherInbound dispatcherInbound = new AsynchronousDispatcherInbound(configurationRegistry,
connectorClient);
configurationRegistry.initializePayloadProcessorsInbound(tenant);
connectorClient.setDispatcher(dispatcherInbound);
connectorClient.reconnect();
connectorClient.submitHousekeeping();
initializeOutboundMapping(tenant, serviceConfiguration, connectorClient);
}
connectorRegistry.registerClient(tenant, connectorClient);
// initialize AsynchronousDispatcherInbound
AsynchronousDispatcherInbound dispatcherInbound = new AsynchronousDispatcherInbound(configurationRegistry,
connectorClient);
configurationRegistry.initializePayloadProcessorsInbound(tenant);
connectorClient.setDispatcher(dispatcherInbound);
connectorClient.reconnect();
connectorClient.submitHousekeeping();
initializeOutboundMapping(tenant, serviceConfiguration, connectorClient);

return connectorClient;
}

Expand All @@ -190,15 +191,28 @@ public void initializeOutboundMapping(String tenant, ServiceConfiguration servic
configurationRegistry.initializePayloadProcessorsOutbound(connectorClient);
AsynchronousDispatcherOutbound dispatcherOutbound = new AsynchronousDispatcherOutbound(
configurationRegistry, connectorClient);
configurationRegistry.getNotificationSubscriber().addConnector(tenant, connectorClient.getConnectorIdent(),
dispatcherOutbound);
// Only initialize Connectors which are enabled
if(connectorClient.getConnectorConfiguration().isEnabled())
configurationRegistry.getNotificationSubscriber().addConnector(tenant, connectorClient.getConnectorIdent(),
dispatcherOutbound);
// Subscriber must be new initialized for the new added connector
//configurationRegistry.getNotificationSubscriber().notificationSubscriberReconnect(tenant);

}
}

public void shutdownConnector(String tenant, String connectorIdent) throws ConnectorRegistryException {
//shutdownAndRemoveConnector will unsubscribe the subscriber which drops all queues
public void shutdownAndRemoveConnector(String tenant, String connectorIdent) throws ConnectorRegistryException {
connectorRegistry.unregisterClient(tenant, connectorIdent);
ServiceConfiguration serviceConfiguration = serviceConfigurationComponent.getServiceConfiguration(tenant);
if (serviceConfiguration.isOutboundMappingEnabled()) {
configurationRegistry.getNotificationSubscriber().unsubscribeDeviceSubscriberByConnector(tenant, connectorIdent);
configurationRegistry.getNotificationSubscriber().removeConnector(tenant, connectorIdent);
}
}

//DisableConnector will just clean-up maps and disconnects Notification 2.0 - queues will be kept
public void disableConnector(String tenant, String connectorIdent) throws ConnectorRegistryException {
connectorRegistry.unregisterClient(tenant, connectorIdent);
ServiceConfiguration serviceConfiguration = serviceConfigurationComponent.getServiceConfiguration(tenant);
if (serviceConfiguration.isOutboundMappingEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ public void initializePayloadProcessorsOutbound(AConnectorClient connectorClient
processorPerTenant = new HashMap<>();
payloadProcessorsOutbound.put(connectorClient.getTenant(), processorPerTenant);
}
if (!processorPerTenant.containsKey(connectorClient.getConnectorIdent())) {
//if (!processorPerTenant.containsKey(connectorClient.getConnectorIdent())) {
// log.info("Tenant {} - HIER VI {} {}", connectorClient.getTenant(),
// processorPerTenant);
processorPerTenant.put(connectorClient.getConnectorIdent(),
createPayloadProcessorsOutbound(connectorClient));
}
//}
}

public MicroserviceCredentials getMicroserviceCredential(String tenant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,12 @@ public List<Mapping> resolveMappingInbound(String tenant, String topic) throws R
.map(mn -> mn.getMapping()).collect(Collectors.toList());
}

public List<Mapping> resolveMappingInbound(String tenant, String topic, String connectorIdent)
throws ResolveException {
List<Mapping> resolvedMappings = resolveMappingInbound(tenant, topic);
resolvedMappings.removeIf(m -> !getDeploymentMapEntry(tenant, m.ident).contains(connectorIdent));
return resolvedMappings;
}
// public List<Mapping> resolveMappingInbound(String tenant, String topic, String connectorIdent)
// throws ResolveException {
// List<Mapping> resolvedMappings = resolveMappingInbound(tenant, topic);
// resolvedMappings.removeIf(m -> !getDeploymentMapEntry(tenant, m.ident).contains(connectorIdent));
// return resolvedMappings;
// }

public void resetSnoop(String tenant, String id) throws Exception {
// step 1. update debug for mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,19 @@ public void initDeviceClient() {
if (dispatcherOutboundMaps.get(tenant) != null) {
for (AsynchronousDispatcherOutbound dispatcherOutbound : dispatcherOutboundMaps.get(tenant)
.values()) {
String tokenSeed = DEVICE_SUBSCRIBER
+ dispatcherOutbound.getConnectorClient().getConnectorIdent()
+ additionalSubscriptionIdTest;
String token = createToken(DEVICE_SUBSCRIPTION,
tokenSeed);
deviceTokens.put(dispatcherOutbound.getConnectorClient().getConnectorIdent(), token);
CustomWebSocketClient client = connect(token, dispatcherOutbound);
deviceClientMap.get(tenant).put(dispatcherOutbound.getConnectorClient().getConnectorIdent(),
client);
//Only connect if connector is enabled
if(dispatcherOutbound.getConnectorClient().getConnectorConfiguration().isEnabled()){
String tokenSeed = DEVICE_SUBSCRIBER
+ dispatcherOutbound.getConnectorClient().getConnectorIdent()
+ additionalSubscriptionIdTest;
String token = createToken(DEVICE_SUBSCRIPTION,
tokenSeed);
deviceTokens.put(dispatcherOutbound.getConnectorClient().getConnectorIdent(), token);
CustomWebSocketClient client = connect(token, dispatcherOutbound);
deviceClientMap.get(tenant).put(dispatcherOutbound.getConnectorClient().getConnectorIdent(),
client);

}
}
}
for (NotificationSubscriptionRepresentation subscription : deviceSubList) {
Expand Down Expand Up @@ -450,10 +453,22 @@ public void unsubscribeAllDevices() {
}

public void unsubscribeDeviceSubscriber(String tenant) {
if (deviceTokenPerConnector.get(tenant) != null)
if (deviceTokenPerConnector.get(tenant) != null) {
for (String token : deviceTokenPerConnector.get(tenant).values()) {
tokenApi.unsubscribe(new Token(token));
}
deviceTokenPerConnector.remove(tenant);
}

}

public void unsubscribeDeviceSubscriberByConnector(String tenant, String connectorIdent) {
if (deviceTokenPerConnector.get(tenant) != null) {
if (deviceTokenPerConnector.get(tenant).get(connectorIdent) != null) {
tokenApi.unsubscribe(new Token(deviceTokenPerConnector.get(tenant).get(connectorIdent)));
deviceTokenPerConnector.get(tenant).remove(connectorIdent);
}
}
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ public static class MappingInboundTask<T> implements Callable<List<ProcessingCon
ServiceConfiguration serviceConfiguration;
Timer inboundProcessingTimer;
Counter inboundProcessingCounter;
AConnectorClient connectorClient;

public MappingInboundTask(ConfigurationRegistry configurationRegistry, List<Mapping> resolvedMappings,
ConnectorMessage message) {
ConnectorMessage message, AConnectorClient connectorClient) {
this.connectorClient = connectorClient;
this.resolvedMappings = resolvedMappings;
this.mappingComponent = configurationRegistry.getMappingComponent();
this.c8yAgent = configurationRegistry.getC8yAgent();
Expand All @@ -120,7 +122,7 @@ public MappingInboundTask(ConfigurationRegistry configurationRegistry, List<Mapp
.tag("tenant", connectorMessage.getTenant()).description("Total number of inbound messages")
.tag("connector", connectorMessage.getConnectorIdent()).register(Metrics.globalRegistry);

}
}

@Override
public List<ProcessingContext<?>> call() throws Exception {
Expand All @@ -134,7 +136,7 @@ public List<ProcessingContext<?>> call() throws Exception {
.getMappingStatus(tenant, Mapping.UNSPECIFIED_MAPPING);
resolvedMappings.forEach(mapping -> {
// only process active mappings
if (mapping.isActive()) {
if (mapping.isActive() && connectorClient.getMappingsDeployedInbound().containsKey(mapping.ident)) {
MappingStatus mappingStatus = mappingComponent.getMappingStatus(tenant, mapping);

ProcessingContext<?> context;
Expand All @@ -161,15 +163,14 @@ public List<ProcessingContext<?>> call() throws Exception {
inboundProcessingCounter.increment();
processor.deserializePayload(context, connectorMessage);
if (serviceConfiguration.logPayload || mapping.debug) {
log.info(
"Tenant {} - New message on topic: {}, on connector: {}, wrapped message: {}",
log.info("Tenant {} - New message on topic: {}, on connector: {}, wrapped message: {}",
tenant,
context.getTopic(),
connectorMessage.getConnectorIdent(),
connectorClient.getConnectorIdent(),
context.getPayload().toString());
} else {
log.info("Tenant {} - New message on topic: {}, on connector: {}", tenant,
context.getTopic(), connectorMessage.getConnectorIdent());
context.getTopic(), connectorClient.getConnectorIdent());
}
mappingStatus.messagesReceived++;
if (mapping.snoopStatus == SnoopStatus.ENABLED
Expand Down Expand Up @@ -240,8 +241,7 @@ public Future<List<ProcessingContext<?>>> processMessage(ConnectorMessage messag
if (topic != null && !topic.startsWith("$SYS")) {
if (message.getPayload() != null) {
try {
resolvedMappings = mappingComponent.resolveMappingInbound(tenant, topic,
message.getConnectorIdent());
resolvedMappings = mappingComponent.resolveMappingInbound(tenant, topic);
} catch (Exception e) {
log.warn(
"Tenant {} - Error resolving appropriate map for topic {}. Could NOT be parsed. Ignoring this message!",
Expand All @@ -258,7 +258,7 @@ public Future<List<ProcessingContext<?>>> processMessage(ConnectorMessage messag

futureProcessingResult = cachedThreadPool.submit(
new MappingInboundTask(configurationRegistry, resolvedMappings,
message));
message, connectorClient));

return futureProcessingResult;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,9 @@ public List<ProcessingContext<?>> call() throws Exception {
connectorClient.getConnectorIdent(),
context.getPayload().toString());
} else {
log.info("Tenant {} - New message for topic: {}, for connector: {},sendPayload: {}",
log.info("Tenant {} - New message for topic: {}, for connector: {}, sendPayload: {}",
tenant,
context.getTopic(),
connectorClient.getConnectorIdent(),
sendPayload);
context.getTopic(), connectorClient.getConnectorIdent(), sendPayload);
}
mappingStatus.messagesReceived++;
if (mapping.snoopStatus == SnoopStatus.ENABLED
Expand Down
Loading

0 comments on commit 9302413

Please sign in to comment.