diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/KafkaClient.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/KafkaClient.java index 14bd4e8b3..609ee3556 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/KafkaClient.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/KafkaClient.java @@ -184,7 +184,7 @@ public KafkaClient(ConfigurationRegistry configurationRegistry, // defaultPropertiesConsumer.put("sasl.mechanism", "SCRAM-SHA-256"); // defaultPropertiesConsumer.put("sasl.jaas.config", jaasCfg); // defaultPropertiesConsumer.put("linger.ms", 1); - updateConnectorStatusAndSend(ConnectorStatus.UNKNOWN, true, true); + // updateConnectorStatusAndSend(ConnectorStatus.UNKNOWN, true, true); } private String bootstrapServers; @@ -325,7 +325,8 @@ public String getConnectorName() { public void subscribe(String topic, QOS qos) throws ConnectorException { TopicConsumer kafkaConsumer = new TopicConsumer( new TopicConfig(tenant, bootstrapServers, topic, username, password, saslMechanism, groupId, - defaultPropertiesConsumer)); + defaultPropertiesConsumer), + connectorStatus); consumerList.put(topic, kafkaConsumer); TopicConsumerCallback topicConsumerCallback = new TopicConsumerCallback(dispatcher, tenant, getConnectorIdent(), topic, true); diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/TopicConsumer.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/TopicConsumer.java index 9513a08d3..7a7ae86bf 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/TopicConsumer.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/kafka/TopicConsumer.java @@ -23,150 +23,171 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; +import dynamic.mapping.core.ConnectorStatusEvent; +import dynamic.mapping.core.ConnectorStatus; + public class TopicConsumer { - private final TopicConfig topicConfig; - - private ConsumingThread consumingThread; // guarded by this - private boolean closed; // guarded by this - - public TopicConsumer(final TopicConfig topicConfig) { - this.topicConfig = topicConfig; - } - - public synchronized void start(final TopicConsumerListener listener) { - if (closed) { - throw new IllegalStateException("Closed"); - } - - if (consumingThread != null) { - throw new IllegalStateException("Already started"); - } - - final ConsumingThread ct = new ConsumingThread(listener); - ct.start(); - consumingThread = ct; - } - - public void stop() throws InterruptedException { - final ConsumingThread ct; - synchronized (this) { - ct = consumingThread; - - if (ct == null) { - return; - } - - consumingThread = null; - } - - ct.close(); - - if (Thread.currentThread() != ct) { - ct.join(); - } - } - - public boolean shouldStop() { - if (consumingThread == null) - return true; - return consumingThread.shouldStop; - } - - public void close() throws InterruptedException { - synchronized (this) { - if (closed) { - return; - } - closed = true; - } - - stop(); - } - - private class ConsumingThread extends Thread { - private final TopicConsumerListener listener; - private volatile boolean closed; - boolean shouldStop = false; - - ConsumingThread(final TopicConsumerListener listener) { - super("Consumer#" + topicConfig.getBootstrapServers() + "/" + topicConfig.getTopic()); - this.listener = listener; - } - - @Override - public void run() { - Exception error = null; - boolean continueToListen = true; - - while (continueToListen) { - Topic tc = null; - try { - tc = new Topic(topicConfig); - - try { - listener.onStarted(); - } catch (final Exception e) { - // log ("Unexpected error while onStarted() notification", e); - } - - // we consume the events from the topic until - // this thread is interrupted by close() - tc.consumeUntilError(listener); - } catch (final Exception e) { - if (closed) { - break; - } - error = e; - if (error instanceof TopicAuthorizationException) { - continueToListen = false; - shouldStop = true; - } - } finally { - if (tc != null) { - try { - tc.close(); - } catch (final Exception ignore) { - } - } - } - - try { - listener.onStoppedByErrorAndReconnecting(error); - } catch (final Exception e) { - // log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", - // e) - } - - try { - Thread.sleep(5000); // TODO: make the timeout configurable and use backoff with jitter - } catch (final InterruptedException e) { - break; // interrupted by close() - // we don't restore the flag interrupted, since we still need - // to do some additional work like - // to notify listener.onStopped() - } - } - - try { - listener.onStopped(); - } catch (final Exception e) { - // log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", - // e); - } - } - - void close() { - if (closed) { // no atomicity/membars required - return; // since can be called only by one single thread - } - closed = true; - - // We stop the consuming with org.apache.kafka.common.errors.InterruptException - // In here it isn't convenient to call Topic.close() directly to initiate - // org.apache.kafka.common.errors.WakeupException, since we recreate - // the instance of Topic and it takes additional efforts to share the - // changeable reference to a Topic to close it from other thread. - interrupt(); - } - } + private final TopicConfig topicConfig; + private final ConnectorStatusEvent connectorStatus; + + private ConsumingThread consumingThread; // guarded by this + private boolean closed; // guarded by this + + public TopicConsumer(final TopicConfig topicConfig, ConnectorStatusEvent connectorStatus) { + this.topicConfig = topicConfig; + this.connectorStatus = connectorStatus; + } + + public synchronized void start(final TopicConsumerListener listener) { + if (closed) { + throw new IllegalStateException("Closed"); + } + + if (consumingThread != null) { + throw new IllegalStateException("Already started"); + } + + final ConsumingThread ct = new ConsumingThread(listener, connectorStatus); + ct.start(); + consumingThread = ct; + } + + public void stop() throws InterruptedException { + final ConsumingThread ct; + synchronized (this) { + ct = consumingThread; + + if (ct == null) { + return; + } + + consumingThread = null; + } + + ct.close(); + + if (Thread.currentThread() != ct) { + ct.join(); + } + } + + public boolean shouldStop() { + if (consumingThread == null) + return true; + return consumingThread.shouldStop; + } + + public void close() throws InterruptedException { + synchronized (this) { + if (closed) { + return; + } + closed = true; + } + + stop(); + } + + private class ConsumingThread extends Thread { + private static final int WAIT_MS_RECONNECT = 30000; + private final TopicConsumerListener listener; + private final ConnectorStatusEvent connectorStatus; + private volatile boolean closed; + boolean shouldStop = false; + + ConsumingThread(final TopicConsumerListener listener, ConnectorStatusEvent connectorStatus) { + super("Consumer#" + topicConfig.getBootstrapServers() + "/" + topicConfig.getTopic()); + this.listener = listener; + this.connectorStatus = connectorStatus; + + } + + @Override + public void run() { + Exception error = null; + boolean continueToListen = true; + + while (continueToListen) { + Topic tc = null; + try { + tc = new Topic(topicConfig); + + try { + listener.onStarted(); + } catch (final Exception e) { + // log ("Unexpected error while onStarted() notification", e); + } + + // we consume the events from the topic until + // this thread is interrupted by close() + connectorStatus.updateStatus(ConnectorStatus.CONNECTED, true); + tc.consumeUntilError(listener); + } catch (final Exception e) { + if (closed) { + break; + } + error = e; + if (error instanceof TopicAuthorizationException) { + continueToListen = false; + shouldStop = true; + } + } finally { + if (tc != null) { + try { + tc.close(); + } catch (final Exception ignore) { + } + } + } + + try { + listener.onStoppedByErrorAndReconnecting(error); + updateConnectorStatusToFailed(error, topicConfig.getTopic()); + } catch (final Exception e) { + // log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", + // e) + } + + try { + Thread.sleep(WAIT_MS_RECONNECT); // TODO: make the timeout configurable and use backoff with jitter + } catch (final InterruptedException e) { + break; // interrupted by close() + // we don't restore the flag interrupted, since we still need + // to do some additional work like + // to notify listener.onStopped() + } + } + + try { + listener.onStopped(); + } catch (final Exception e) { + // log ("Unexpected error while onStoppedByErrorAndReconnecting() notification", + // e); + } + } + + protected void updateConnectorStatusToFailed(Exception e, String topic) { + String msg = "Topic:" + topic + " --- " + e.getClass().getName() + ": " + + e.getMessage(); + if (!(e.getCause() == null)) { + msg = msg + " --- Caused by " + e.getCause().getClass().getName() + ": " + e.getCause().getMessage(); + } + connectorStatus.setMessage(msg); + connectorStatus.updateStatus(ConnectorStatus.FAILED, false); + } + + void close() { + if (closed) { // no atomicity/membars required + return; // since can be called only by one single thread + } + closed = true; + + // We stop the consuming with org.apache.kafka.common.errors.InterruptException + // In here it isn't convenient to call Topic.close() directly to initiate + // org.apache.kafka.common.errors.WakeupException, since we recreate + // the instance of Topic and it takes additional efforts to share the + // changeable reference to a Topic to close it from other thread. + interrupt(); + } + } } \ No newline at end of file diff --git a/dynamic-mapping-ui/package-lock.json b/dynamic-mapping-ui/package-lock.json index 818d05b38..693a106d6 100644 --- a/dynamic-mapping-ui/package-lock.json +++ b/dynamic-mapping-ui/package-lock.json @@ -1,12 +1,12 @@ { "name": "dynamic-mapping", - "version": "4.5.1", + "version": "4.5.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "dynamic-mapping", - "version": "4.5.1", + "version": "4.5.2", "license": "ISC", "dependencies": { "@angular/animations": "^17.3.0", @@ -38,12 +38,12 @@ "@angular-devkit/build-angular": "^17.3.8", "@angular-eslint/eslint-plugin": "^18.1.0", "@angular-eslint/eslint-plugin-template": "^18.3.0", - "@angular-eslint/template-parser": "^17.1.1", + "@angular-eslint/template-parser": "^18.3.0", "@angular/compiler-cli": "^17.3.11", "@angular/language-service": "^18.2.2", "@angular/localize": "^17.3.0", "@angular/service-worker": "^17.3.0", - "@c8y/devkit": "^1020.18.1", + "@c8y/devkit": "^1020.18.3", "@c8y/package-blueprint": "^1020.18.1", "@typescript-eslint/eslint-plugin": "^7.17.0", "cumulocity-cypress": "^0.4.6", @@ -537,23 +537,23 @@ } }, "node_modules/@angular-eslint/template-parser": { - "version": "17.5.2", - "resolved": "https://registry.npmjs.org/@angular-eslint/template-parser/-/template-parser-17.5.2.tgz", - "integrity": "sha512-46emLElmnIUzW0bpEpSf0u05ofRVUwlfttDOMLedhi700peUKbB9Y6iyz3GzAtQCMklBbJC9nR87LQRH9aSlog==", + "version": "18.3.0", + "resolved": "https://registry.npmjs.org/@angular-eslint/template-parser/-/template-parser-18.3.0.tgz", + "integrity": "sha512-1mUquqcnugI4qsoxcYZKZ6WMi6RPelDcJZg2YqGyuaIuhWmi3ZqJZLErSSpjP60+TbYZu7wM8Kchqa1bwJtEaQ==", "dev": true, "dependencies": { - "@angular-eslint/bundled-angular-compiler": "17.5.2", - "eslint-scope": "^8.0.0" + "@angular-eslint/bundled-angular-compiler": "18.3.0", + "eslint-scope": "^8.0.2" }, "peerDependencies": { - "eslint": "^7.20.0 || ^8.0.0", + "eslint": "^8.57.0 || ^9.0.0", "typescript": "*" } }, "node_modules/@angular-eslint/template-parser/node_modules/@angular-eslint/bundled-angular-compiler": { - "version": "17.5.2", - "resolved": "https://registry.npmjs.org/@angular-eslint/bundled-angular-compiler/-/bundled-angular-compiler-17.5.2.tgz", - "integrity": "sha512-K4hVnMyI98faMJmsA4EOBkD0tapDjWV5gy0j/wJ2uSL46d3JgZPZNJSO1zStf/b3kT4gLOlQ/ulWFiUf1DxgIw==", + "version": "18.3.0", + "resolved": "https://registry.npmjs.org/@angular-eslint/bundled-angular-compiler/-/bundled-angular-compiler-18.3.0.tgz", + "integrity": "sha512-v/59FxUKnMzymVce99gV43huxoqXWMb85aKvzlNvLN+ScDu6ZE4YMiTQNpfapVL2lkxhs0uwB3jH17EYd5TcsA==", "dev": true }, "node_modules/@angular-eslint/utils": { @@ -2942,9 +2942,9 @@ } }, "node_modules/@c8y/devkit": { - "version": "1020.18.1", - "resolved": "https://registry.npmjs.org/@c8y/devkit/-/devkit-1020.18.1.tgz", - "integrity": "sha512-39d0qTuj9XJSSysAxNrneELMcLEiWF+dnpEMtVLYoJdRY974Fh93Ryy/6Yd1525TnoMRZ3HckvHEJyA50H4o5g==", + "version": "1020.18.3", + "resolved": "https://registry.npmjs.org/@c8y/devkit/-/devkit-1020.18.3.tgz", + "integrity": "sha512-bhP1Sg+DzQpTRTfEMczrhKCv9vOV1+yvZPlLvAmWCm9FXvJsyHW24HYNUe0uyNdL2oCX0EBnSlN32MQdOTc5kg==", "dev": true, "dependencies": { "@babel/cli": "7.23.9", @@ -2957,8 +2957,8 @@ "@babel/plugin-syntax-dynamic-import": "7.8.3", "@babel/plugin-transform-async-to-generator": "^7.18.6", "@babel/preset-env": "^7.24.4", - "@c8y/client": "1020.18.1", - "@c8y/options": "1020.18.1", + "@c8y/client": "1020.18.3", + "@c8y/options": "1020.18.3", "@schematics/angular": "^17.3.1", "angular-gettext-tools": "2.5.3", "autoprefixer": "10.4.19", @@ -3143,6 +3143,31 @@ "semver": "bin/semver.js" } }, + "node_modules/@c8y/devkit/node_modules/@c8y/client": { + "version": "1020.18.3", + "resolved": "https://registry.npmjs.org/@c8y/client/-/client-1020.18.3.tgz", + "integrity": "sha512-o1qbmndY9sqGB9aJKkRAo7a/ZUJh5YJpYWEi82EStp/1V5sKIRCjzPtQRrdjXauLW137Pvwcrpo9eUbSlxIhCA==", + "dev": true, + "dependencies": { + "@types/cometd": "4.0.8", + "@types/node": "18", + "b2a": "1.1.2", + "cometd": "4.0.8", + "cometd-nodejs-client": "1.0.2", + "cross-fetch": "4.0.0", + "form-data": "4.0.0", + "isomorphic-cometd": "1.1.0" + } + }, + "node_modules/@c8y/devkit/node_modules/@c8y/options": { + "version": "1020.18.3", + "resolved": "https://registry.npmjs.org/@c8y/options/-/options-1020.18.3.tgz", + "integrity": "sha512-2wwfdPdIdhCE9q/bbZL0+puKH513HDfMtC9qvIyTfY1B1hHIEg/xwjn5GyGg4nR5zuLSJzik7OO9BjW7Ati3Mg==", + "dev": true, + "peerDependencies": { + "@c8y/client": "1020.18.3" + } + }, "node_modules/@c8y/devkit/node_modules/ansi-styles": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz", diff --git a/dynamic-mapping-ui/package.json b/dynamic-mapping-ui/package.json index 9b003f0f0..42bd28daf 100644 --- a/dynamic-mapping-ui/package.json +++ b/dynamic-mapping-ui/package.json @@ -48,12 +48,12 @@ "@angular-devkit/build-angular": "^17.3.8", "@angular-eslint/eslint-plugin": "^18.1.0", "@angular-eslint/eslint-plugin-template": "^18.3.0", - "@angular-eslint/template-parser": "^17.1.1", + "@angular-eslint/template-parser": "^18.3.0", "@angular/compiler-cli": "^17.3.11", "@angular/language-service": "^18.2.2", "@angular/localize": "^17.3.0", "@angular/service-worker": "^17.3.0", - "@c8y/devkit": "^1020.18.1", + "@c8y/devkit": "^1020.18.3", "@c8y/package-blueprint": "^1020.18.1", "@typescript-eslint/eslint-plugin": "^7.17.0", "cumulocity-cypress": "^0.4.6", diff --git a/dynamic-mapping-ui/src/mapping/stepper-mapping/mapping-stepper.component.ts b/dynamic-mapping-ui/src/mapping/stepper-mapping/mapping-stepper.component.ts index 5db33337c..89d240fdc 100644 --- a/dynamic-mapping-ui/src/mapping/stepper-mapping/mapping-stepper.component.ts +++ b/dynamic-mapping-ui/src/mapping/stepper-mapping/mapping-stepper.component.ts @@ -34,13 +34,14 @@ import { FormControl, FormGroup } from '@angular/forms'; import { AlertService, C8yStepper } from '@c8y/ngx-components'; import { FormlyFieldConfig } from '@ngx-formly/core'; import * as _ from 'lodash'; -import { BsModalService } from 'ngx-bootstrap/modal'; +import { BsModalRef, BsModalService } from 'ngx-bootstrap/modal'; import { BehaviorSubject, Subject } from 'rxjs'; import { Content } from 'vanilla-jsoneditor'; import { ExtensionService } from '../../extension'; import { API, COLOR_HIGHLIGHTED, + ConfirmationModalComponent, DeploymentMapEntry, Direction, Extension, @@ -587,7 +588,7 @@ export class MappingStepperComponent implements OnInit, OnDestroy { this.substitutionModel.targetExpression.severity = 'text-info'; } else if (path == '$') { this.substitutionModel.targetExpression.msgTxt = `By specifying "$" you selected the root of the target - template and this rersults in merging the source expression with the target template.`; + template and this result in merging the source expression with the target template.`; this.substitutionModel.targetExpression.severity = 'text-warning'; } } catch (error) { @@ -661,7 +662,39 @@ export class MappingStepperComponent implements OnInit, OnDestroy { }): Promise { // ('OnNextStep', event.step.label, this.mapping); this.step = event.step.label; - if (this.step == 'General settings') { + if (this.step == 'Add and select connector') { + if ( + this.deploymentMapEntry.connectors && + this.deploymentMapEntry.connectors.length == 0 + ) { + const initialState = { + title: 'No connector selected', + message: + 'To apply the mapping to messages you should select at least one connector, unless you want to change this later! Do you want to continue?', + labels: { + ok: 'Continue', + cancel: 'Close' + } + }; + const confirmContinuingModalRef: BsModalRef = this.bsModalService.show( + ConfirmationModalComponent, + { initialState } + ); + confirmContinuingModalRef.content.closeSubject.subscribe( + async (confirmation: boolean) => { + // console.log('Confirmation result:', confirmation); + if (confirmation) { + event.stepper.next(); + } + confirmContinuingModalRef.hide(); + } + ); + // this.alertService.warning( + // 'To apply the mapping to messages you have to select at least one connector. Go back, unless you only want to assign a connector later!' + // ); + } + event.stepper.next(); + } else if (this.step == 'General settings') { this.templateModel.mapping = this.mapping; // console.log( // 'Populate jsonPath if wildcard:', diff --git a/dynamic-mapping-ui/src/mapping/stepper-snooping/snooping-stepper.component.html b/dynamic-mapping-ui/src/mapping/stepper-snooping/snooping-stepper.component.html index 34ed74b2f..35999d8d0 100644 --- a/dynamic-mapping-ui/src/mapping/stepper-snooping/snooping-stepper.component.html +++ b/dynamic-mapping-ui/src/mapping/stepper-snooping/snooping-stepper.component.html @@ -36,6 +36,27 @@ +
+
+
+

+
+ Select connector for snooping (at least one is required) +
+ Selected connectors for snooping + {{ 'Read Only' }} +
+
+

+
+
+
+
{ + // console.log('Confirmation result:', confirmation); + if (confirmation) { + event.stepper.next(); + } + confirmContinuingModalRef.hide(); + } + ); + // this.alertService.warning( + // 'To snoop for messages you have to select at least one connector. Go back, unless you only want to assign a connector later!' + // ); + } } } diff --git a/dynamic-mapping-ui/src/shared/connector-configuration/connector-grid.component.ts b/dynamic-mapping-ui/src/shared/connector-configuration/connector-grid.component.ts index df9128ab9..ddae000f3 100644 --- a/dynamic-mapping-ui/src/shared/connector-configuration/connector-grid.component.ts +++ b/dynamic-mapping-ui/src/shared/connector-configuration/connector-grid.component.ts @@ -25,9 +25,7 @@ import { OnInit, Output, ViewChild, - AfterViewInit, - ElementRef, - Renderer2 + AfterViewInit } from '@angular/core'; import { ActionControl, @@ -105,9 +103,7 @@ export class ConnectorConfigurationComponent implements OnInit, AfterViewInit { constructor( private bsModalService: BsModalService, private connectorConfigurationService: ConnectorConfigurationService, - private alertService: AlertService, - private el: ElementRef, - private renderer: Renderer2 + private alertService: AlertService ) {} ngAfterViewInit(): void { @@ -192,7 +188,7 @@ export class ConnectorConfigurationComponent implements OnInit, AfterViewInit { filterable: false, sortable: true, cellRendererComponent: ConnectorStatusRendererComponent, - gridTrackSize: '20%' + gridTrackSize: '10%' }, { header: 'Enabled', @@ -201,7 +197,7 @@ export class ConnectorConfigurationComponent implements OnInit, AfterViewInit { filterable: false, sortable: true, cellRendererComponent: StatusEnabledRendererComponent, - gridTrackSize: '15%' + gridTrackSize: '10%' } ); this.selected = this.deploymentMapEntry?.connectors ?? [];