From a02bef1d3123476c65ac9b9cef98911af7553d73 Mon Sep 17 00:00:00 2001 From: vga91 Date: Wed, 11 Dec 2024 14:54:38 +0100 Subject: [PATCH] cleanup --- .../database-integration/kafka/cloud.adoc | 2 +- .../database-integration/kafka/index.adoc | 115 +------ .../kafka/procedures.adoc | 4 +- .../kafka/producer-configuration.adoc | 2 +- .../partials/producer-data/node.created.json | 37 --- .../partials/producer-data/node.deleted.json | 39 --- .../partials/producer-data/node.updated.json | 46 --- .../producer-data/relationship.created.json | 49 --- .../producer-data/relationship.deleted.json | 51 --- .../producer-data/relationship.updated.json | 56 ---- .../main/java/apoc/ExtendedApocConfig.java | 19 -- .../apoc/ExtendedApocGlobalComponents.java | 1 + .../src/main/java/apoc/generate/Generate.java | 2 +- extended/src/main/java/apoc/load/Jdbc.java | 2 +- .../kotlin/apoc/kafka/PublishProcedures.kt | 5 - .../apoc/kafka/consumer/StreamsEventSink.kt | 6 +- .../StreamsSinkConfigurationListener.kt | 31 +- .../kafka/consumer/StreamsTopicService.kt | 95 ------ .../kafka/KafkaAutoCommitEventConsumer.kt | 4 - .../kafka/consumer/kafka/KafkaEventSink.kt | 36 +-- .../consumer/kafka/KafkaSinkConfiguration.kt | 26 +- .../procedures/QueueBasedSpliterator.kt | 67 ---- .../procedures/StreamsSinkProcedures.kt | 11 +- .../kotlin/apoc/kafka/events/KafkaStatus.kt | 3 + .../apoc/kafka/events/StreamsPluginStatus.kt | 3 - .../apoc/kafka/extensions/CommonExtensions.kt | 18 +- .../events/PreviousTransactionData.kt | 294 ------------------ .../kafka/producer/kafka/KafkaEventRouter.kt | 13 +- 28 files changed, 48 insertions(+), 989 deletions(-) delete mode 100644 docs/asciidoc/modules/ROOT/partials/producer-data/node.created.json delete mode 100644 docs/asciidoc/modules/ROOT/partials/producer-data/node.deleted.json delete mode 100644 docs/asciidoc/modules/ROOT/partials/producer-data/node.updated.json delete mode 100644 docs/asciidoc/modules/ROOT/partials/producer-data/relationship.created.json delete mode 100644 docs/asciidoc/modules/ROOT/partials/producer-data/relationship.deleted.json delete mode 100644 docs/asciidoc/modules/ROOT/partials/producer-data/relationship.updated.json delete mode 100644 extended/src/main/kotlin/apoc/kafka/consumer/StreamsTopicService.kt delete mode 100644 extended/src/main/kotlin/apoc/kafka/consumer/procedures/QueueBasedSpliterator.kt create mode 100644 extended/src/main/kotlin/apoc/kafka/events/KafkaStatus.kt delete mode 100644 extended/src/main/kotlin/apoc/kafka/events/StreamsPluginStatus.kt delete mode 100644 extended/src/main/kotlin/apoc/kafka/producer/events/PreviousTransactionData.kt diff --git a/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/cloud.adoc b/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/cloud.adoc index f72a37e6c8..1965cf37cb 100644 --- a/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/cloud.adoc +++ b/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/cloud.adoc @@ -10,7 +10,7 @@ At a minimum, to configure this, you will need: * `API_KEY` * `API_SECRET` -More specifically the plugin has to be configured as follow: +More specifically the procedures has to be configured as follows: .neo4j.conf [source,ini] diff --git a/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/index.adoc b/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/index.adoc index c90b16858f..a2d5d2dde8 100644 --- a/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/index.adoc +++ b/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/index.adoc @@ -2,39 +2,30 @@ [[kafka]] -ifdef::env-docs[] -[abstract] --- -Get started fast for common scenarios, using APOC Kafka plugin or Kafka Connect plugin --- -endif::env-docs[] [[apoc_neo4j_plugin_quickstart]] -== APOC Kafka Plugin +== APOC Kafka Procedures -Any configuration option that starts with `apoc.kafka.` controls how the plugin itself behaves. For a full -list of options available, see the documentation subsections on the xref:database-integration/kafka/producer.adoc[source] and xref:database-integration/kafka/consumer.adoc#apoc_kafka_sink[sink]. +NOTE: to enable the Kafka dependencies we need to set the APOC configuration `apoc.kafka.enabled=true` -=== Install the Plugin +Any configuration option that starts with `apoc.kafka.` controls how the procedures itself behaves. -This dependency is included in https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/{apoc-release}/apoc-kafka-dependencies-{apoc-release}-all.jar[apoc-kafka-dependencies-{apoc-release}-all.jar^], which can be downloaded from the https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/tag/{apoc-release}[releases page^]. +=== Install dependencies + +The Kafka dependencies are included in https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/{apoc-release}/apoc-kafka-dependencies-{apoc-release}-all.jar[apoc-kafka-dependencies-{apoc-release}-all.jar^], which can be downloaded from the https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/tag/{apoc-release}[releases page^]. Once that file is downloaded, it should be placed in the `plugins` directory and the Neo4j Server restarted. [[kafka-settings]] === Kafka settings Any configuration option that starts with `apoc.kafka.` will be passed to the underlying Kafka driver. Neo4j -Kafka plugin uses the official Confluent Kafka producer and consumer java clients. +Kafka procedures uses the official Confluent Kafka producer and consumer java clients. Configuration settings which are valid for those connectors will also work for APOC Kafka. For example, in the Kafka documentation linked below, the configuration setting named `batch.size` should be stated as `apoc.kafka.batch.size` in APOC Kafka. -The following are common configuration settings you may wish to use. _This is not a complete -list_. The full list of configuration options and reference material is available from Confluent's -site for link:{url-confluent-install}/configuration/consumer-configs.html[sink configurations] and -link:{url-confluent-install}/configuration/producer-configs.html[source configurations]. - +The following are common configuration settings you may wish to use. .Most Common Needed Configuration Settings |=== |Setting Name |Description |Default Value @@ -46,7 +37,7 @@ larger transactions in Neo4j memory and may improve throughput. |apoc.kafka.buffer.memory |The total bytes of memory the producer can use to buffer records waiting. Use this to adjust -how much memory the plugin may require to hold messages not yet delivered to Neo4j +how much memory the procedures may require to hold messages not yet delivered to Neo4j |33554432 |apoc.kafka.batch.size @@ -75,94 +66,8 @@ apoc.kafka.bootstrap.servers=localhost:9092 If you are using Confluent Cloud (managed Kafka), you can connect to Kafka as described in the xref:database-integration/kafka/cloud.adoc#confluent_cloud[Confluent Cloud] section -=== Decide: Sink, Source, or Both - -Configuring APOC Neo4j plugin comes in three different parts, depending on your need: - -. *Required*: Configuring a connection to Kafka - -.neo4j.conf -[source,ini] ----- -apoc.kafka.bootstrap.servers=localhost:9092 ----- - -. _Optional_: Configuring Neo4j to produce records to Kafka (xref:database-integration/kafka/producer.adoc[Source]) -. _Optional_: Configuring Neo4j to ingest from Kafka (xref:database-integration/kafka/consumer.adoc#apoc_kafka_sink[Sink]) - -Follow one or both subsections according to your use case and need: - -==== Sink - -Take data from Kafka and store it in Neo4j (Neo4j as a data consumer) by adding configuration such as: - -.neo4j.conf -[source,ini] ----- -apoc.kafka.sink.enabled=true -apoc.kafka.sink.topic.cypher.my-ingest-topic=MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties ----- - -This will process every message that comes in on `my-ingest-topic` with the given cypher statement. When -that cypher statement executes, the `event` variable that is referenced will be set to the message received, -so this sample cypher will create a `(:Label)` node in the graph with the given ID, copying all of the -properties in the source message. - -For full details on what you can do here, see the xref:database-integration/kafka/consumer.adoc#apoc_kafka_sink[Sink] section of the documentation. - -==== Source - -Produce data from Neo4j and send it to a Kafka topic (Neo4j as a data producer) by adding configuration such as: - -.neo4j.conf -[source,ini] ----- -apoc.kafka.source.topic.nodes.my-nodes-topic=Person{*} -apoc.kafka.source.topic.relationships.my-rels-topic=BELONGS-TO{*} -apoc.kafka.source.enabled=true -apoc.kafka.source.schema.polling.interval=10000 ----- - -This will produce all graph nodes labeled `(:Person)` on to the topic `my-nodes-topic` and all -relationships of type `-[:BELONGS-TO]->` to the topic named `my-rels-topic`. Further, schema changes will -be polled every 10,000 ms, which affects how quickly the database picks up new indexes/schema changes. -Please note that if not specified a value for `apoc.kafka.source.schema.polling.interval` property then Streams plugin will use -300,000 ms as default. - -The expressions `Person{\*}` and `BELONGS-TO{*}` are _patterns_. You can find documentation on how to change -these in the xref:database-integration/kafka/producer.adoc#source-patterns[Patterns] section. - -For full details on what you can do here, see the xref:database-integration/kafka/producer.adoc[Source] section of the documentation. ==== Restart Neo4j Once the plugin is installed and configured, restarting the database will make it active. -If you have configured Neo4j to consume from kafka, it will begin immediately processing messages. - -[NOTE] - -==== -When installing the latest version of the APOC Kafka plugin into Neo4j 4.x, watching to logs you could find something -similar to the following: - -[source,logs] ----- -2020-03-25 20:13:50.606+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.max.partition.fetch.bytes -2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.log.include.messages -2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.auto.offset.reset -2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.bootstrap.servers -2020-03-25 20:13:50.608+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.max.poll.records -2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.log.enable -2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.source.enabled -2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.topic.cypher.boa.to.kafkaTest -2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.tolerance -2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.group.id -2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.deadletterqueue.context.headers.enable -2020-03-25 20:13:50.609+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.deadletterqueue.context.header.prefix -2020-03-25 20:13:50.610+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.errors.deadletterqueue.topic.name -2020-03-25 20:13:50.610+0000 WARN Unrecognized setting. No declared setting with name: apoc.kafka.sink.enabled.to.kafkaTest ----- - -*These are not errors*. They comes from the new Neo4j 4 Configuration System, which warns that it doesn't recognize those -properties. Despite these warnings the plugin will work properly. -==== \ No newline at end of file +If you have configured Neo4j to consume from kafka, it will begin immediately processing messages. \ No newline at end of file diff --git a/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/procedures.adoc b/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/procedures.adoc index 5743e8a8d5..00198fbf8c 100644 --- a/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/procedures.adoc +++ b/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/procedures.adoc @@ -5,12 +5,12 @@ ifdef::env-docs[] [abstract] -- This chapter describes the APOC Kafka Procedures in the APOC Kafka Library. -Use this section to configure Neo4j to know how procedures allow the functionality of the plugin +Use this section to configure Neo4j to know how procedures allow the functionality of the procedures to be used ad-hoc in any Cypher query. -- endif::env-docs[] -The APOC Kafka plugin comes out with a list of procedures. +The APOC Kafka procedures comes out with a list of procedures. == Configuration diff --git a/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/producer-configuration.adoc b/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/producer-configuration.adoc index 61fda86156..2db49d2498 100644 --- a/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/producer-configuration.adoc +++ b/docs/asciidoc/modules/ROOT/pages/database-integration/kafka/producer-configuration.adoc @@ -32,7 +32,7 @@ In case you Kafka broker is configured with `auto.create.topics.enable` to `fals all the messages sent to topics that don't exist are discarded; this because the `KafkaProducer.send()` method blocks the execution, as explained in https://issues.apache.org/jira/browse/KAFKA-3539[KAFKA-3539]. You can tune the custom property `kafka.topic.discovery.polling.interval` in order to -periodically check for new topics into the Kafka cluster so the plugin will be able +periodically check for new topics into the Kafka cluster so the procedures will be able to send events to the defined topics. diff --git a/docs/asciidoc/modules/ROOT/partials/producer-data/node.created.json b/docs/asciidoc/modules/ROOT/partials/producer-data/node.created.json deleted file mode 100644 index b897e7bc7a..0000000000 --- a/docs/asciidoc/modules/ROOT/partials/producer-data/node.created.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "meta": { - "timestamp": 1532597182604, - "username": "neo4j", - "tx_id": 3, - "tx_event_id": 0, - "tx_events_count": 2, - "operation": "created", - "source": { - "hostname": "neo4j.mycompany.com" - } - }, - "payload": { - "id": "1004", - "type": "node", - "after": { - "labels": ["Person"], - "properties": { - "email": "annek@noanswer.org", - "last_name": "Kretchmar", - "first_name": "Anne Marie" - } - } - }, - "schema": { - "properties": { - "last_name": "String", - "email": "String", - "first_name": "String" - }, - "constraints": [{ - "label": "Person", - "properties": ["first_name", "last_name"], - "type": "UNIQUE" - }] - } -} \ No newline at end of file diff --git a/docs/asciidoc/modules/ROOT/partials/producer-data/node.deleted.json b/docs/asciidoc/modules/ROOT/partials/producer-data/node.deleted.json deleted file mode 100644 index 8d3f08d3f1..0000000000 --- a/docs/asciidoc/modules/ROOT/partials/producer-data/node.deleted.json +++ /dev/null @@ -1,39 +0,0 @@ -{ - "meta": { - "timestamp": 1532597182604, - "username": "neo4j", - "tx_id": 3, - "tx_event_id": 0, - "tx_events_count": 2, - "operation": "deleted", - "source": { - "hostname": "neo4j.mycompany.com" - } - }, - "payload": { - "id": "1004", - "type": "node", - "before": { - "labels": ["Person"], - "properties": { - "last_name": "Kretchmar", - "email": "annek@noanswer.org", - "first_name": "Anne Marie", - "geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 } - } - } - }, - "schema": { - "properties": { - "last_name": "String", - "email": "String", - "first_name": "String", - "geo": "point" - }, - "constraints": [{ - "label": "Person", - "properties": ["first_name", "last_name"], - "type": "UNIQUE" - }] - } -} \ No newline at end of file diff --git a/docs/asciidoc/modules/ROOT/partials/producer-data/node.updated.json b/docs/asciidoc/modules/ROOT/partials/producer-data/node.updated.json deleted file mode 100644 index 080cc5cbda..0000000000 --- a/docs/asciidoc/modules/ROOT/partials/producer-data/node.updated.json +++ /dev/null @@ -1,46 +0,0 @@ -{ - "meta": { - "timestamp": 1532597182604, - "username": "neo4j", - "tx_id": 3, - "tx_event_id": 0, - "tx_events_count": 2, - "operation": "updated", - "source": { - "hostname": "neo4j.mycompany.com" - } - }, - "payload": { - "id": "1004", - "type": "node", - "before": { - "labels": ["Person", "Tmp"], - "properties": { - "email": "annek@noanswer.org", - "last_name": "Kretchmar", - "first_name": "Anne" - } - }, - "after": { - "labels": ["Person"], - "properties": { - "last_name": "Kretchmar", - "email": "annek@noanswer.org", - "first_name": "Anne Marie", - "geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 } - } - } - }, - "schema": { - "properties": { - "last_name": "String", - "email": "String", - "first_name": "String" - }, - "constraints": [{ - "label": "Person", - "properties": ["first_name", "last_name"], - "type": "UNIQUE" - }] - } -} \ No newline at end of file diff --git a/docs/asciidoc/modules/ROOT/partials/producer-data/relationship.created.json b/docs/asciidoc/modules/ROOT/partials/producer-data/relationship.created.json deleted file mode 100644 index d8d9c9f666..0000000000 --- a/docs/asciidoc/modules/ROOT/partials/producer-data/relationship.created.json +++ /dev/null @@ -1,49 +0,0 @@ -{ - "meta": { - "timestamp": 1532597182604, - "username": "neo4j", - "tx_id": 3, - "tx_event_id": 0, - "tx_events_count": 2, - "operation": "created", - "source": { - "hostname": "neo4j.mycompany.com" - } - }, - "payload": { - "id": "123", - "type": "relationship", - "label": "KNOWS", - "start": { - "labels": ["Person"], - "id": "123", - "ids": { - "last_name": "Andrea", - "first_name": "Santurbano" - } - }, - "end": { - "labels": ["Person"], - "id": "456", - "ids": { - "last_name": "Michael", - "first_name": "Hunger" - } - }, - "after": { - "properties": { - "since": "2018-04-05T12:34:00[Europe/Berlin]" - } - } - }, - "schema": { - "properties": { - "since": "ZonedDateTime" - }, - "constraints": [{ - "label": "KNOWS", - "properties": ["since"], - "type": "RELATIONSHIP_PROPERTY_EXISTS" - }] - } -} diff --git a/docs/asciidoc/modules/ROOT/partials/producer-data/relationship.deleted.json b/docs/asciidoc/modules/ROOT/partials/producer-data/relationship.deleted.json deleted file mode 100644 index 8e54eba546..0000000000 --- a/docs/asciidoc/modules/ROOT/partials/producer-data/relationship.deleted.json +++ /dev/null @@ -1,51 +0,0 @@ -{ - "meta": { - "timestamp": 1532597182604, - "username": "neo4j", - "tx_id": 3, - "tx_event_id": 0, - "tx_events_count": 2, - "operation": "deleted", - "source": { - "hostname": "neo4j.mycompany.com" - } - }, - "payload": { - "id": "123", - "type": "relationship", - "label": "KNOWS", - "start": { - "labels": ["Person"], - "id": "123", - "ids": { - "last_name": "Andrea", - "first_name": "Santurbano" - } - }, - "end": { - "labels": ["Person"], - "id": "456", - "ids": { - "last_name": "Michael", - "first_name": "Hunger" - } - }, - "before": { - "properties": { - "since": "2018-04-05T12:34:00[Europe/Berlin]", - "to": "2019-04-05T23:00:00[Europe/Berlin]" - } - } - }, - "schema": { - "properties": { - "since": "ZonedDateTime", - "to": "ZonedDateTime" - }, - "constraints": [{ - "label": "KNOWS", - "properties": ["since"], - "type": "RELATIONSHIP_PROPERTY_EXISTS" - }] - } -} diff --git a/docs/asciidoc/modules/ROOT/partials/producer-data/relationship.updated.json b/docs/asciidoc/modules/ROOT/partials/producer-data/relationship.updated.json deleted file mode 100644 index 369f09c82b..0000000000 --- a/docs/asciidoc/modules/ROOT/partials/producer-data/relationship.updated.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "meta": { - "timestamp": 1532597182604, - "username": "neo4j", - "tx_id": 3, - "tx_event_id": 0, - "tx_events_count": 2, - "operation": "updated", - "source": { - "hostname": "neo4j.mycompany.com" - } - }, - "payload": { - "id": "123", - "type": "relationship", - "label": "KNOWS", - "start": { - "labels": ["Person"], - "id": "123", - "ids": { - "last_name": "Andrea", - "first_name": "Santurbano" - } - }, - "end": { - "labels": ["Person"], - "id": "456", - "ids": { - "last_name": "Michael", - "first_name": "Hunger" - } - }, - "before": { - "properties": { - "since": "2018-04-05T12:34:00[Europe/Berlin]" - } - }, - "after": { - "properties": { - "since": "2018-04-05T12:34:00[Europe/Berlin]", - "to": "2019-04-05T23:00:00[Europe/Berlin]" - } - } - }, - "schema": { - "properties": { - "since": "ZonedDateTime", - "to": "ZonedDateTime" - }, - "constraints": [{ - "label": "KNOWS", - "properties": ["since"], - "type": "RELATIONSHIP_PROPERTY_EXISTS" - }] - } -} diff --git a/extended/src/main/java/apoc/ExtendedApocConfig.java b/extended/src/main/java/apoc/ExtendedApocConfig.java index e89cc12241..5cc43e5bcc 100644 --- a/extended/src/main/java/apoc/ExtendedApocConfig.java +++ b/extended/src/main/java/apoc/ExtendedApocConfig.java @@ -74,25 +74,6 @@ public enum UuidFormatType { hex, base64 } public static final String CONFIG_DIR = "config-dir="; - private static final String CONF_DIR_ARG = "config-dir="; - private static final String SOURCE_ENABLED = "apoc.kafka.source.enabled"; - private static final boolean SOURCE_ENABLED_VALUE = true; - private static final String PROCEDURES_ENABLED = "apoc.kafka.procedures.enabled"; - private static final boolean PROCEDURES_ENABLED_VALUE = true; - private static final String SINK_ENABLED = "apoc.kafka.sink.enabled"; - private static final boolean SINK_ENABLED_VALUE = false; - private static final String CHECK_APOC_TIMEOUT = "apoc.kafka.check.apoc.timeout"; - private static final String CHECK_APOC_INTERVAL = "apoc.kafka.check.apoc.interval"; - private static final String CLUSTER_ONLY = "apoc.kafka.cluster.only"; - private static final String CHECK_WRITEABLE_INSTANCE_INTERVAL = "apoc.kafka.check.writeable.instance.interval"; - private static final String SYSTEM_DB_WAIT_TIMEOUT = "apoc.kafka.systemdb.wait.timeout"; - private static final long SYSTEM_DB_WAIT_TIMEOUT_VALUE = 10000L; - private static final String POLL_INTERVAL = "apoc.kafka.sink.poll.interval"; - private static final String INSTANCE_WAIT_TIMEOUT = "apoc.kafka.wait.timeout"; - private static final long INSTANCE_WAIT_TIMEOUT_VALUE = 120000L; - private static final int DEFAULT_TRIGGER_PERIOD = 10000; - private static final String DEFAULT_PATH = "."; - public ExtendedApocConfig(LogService log, GlobalProcedures globalProceduresRegistry, String defaultConfigPath) { this.log = log.getInternalLog(ApocConfig.class); this.defaultConfigPath = defaultConfigPath; diff --git a/extended/src/main/java/apoc/ExtendedApocGlobalComponents.java b/extended/src/main/java/apoc/ExtendedApocGlobalComponents.java index bd9c1063a3..daf064fef3 100644 --- a/extended/src/main/java/apoc/ExtendedApocGlobalComponents.java +++ b/extended/src/main/java/apoc/ExtendedApocGlobalComponents.java @@ -56,6 +56,7 @@ public Map getServices(GraphDatabaseAPI db, ApocExtensionFact serviceMap.put("cypherProcedures", cypherProcedureHandler); + // add kafkaHandler only if apoc.kafka.enabled=true boolean isKafkaEnabled = dependencies.apocConfig().getConfig().getBoolean(APOC_KAFKA_ENABLED, false); if (isKafkaEnabled) { try { diff --git a/extended/src/main/java/apoc/generate/Generate.java b/extended/src/main/java/apoc/generate/Generate.java index 0a4005ffe5..16ca04011e 100644 --- a/extended/src/main/java/apoc/generate/Generate.java +++ b/extended/src/main/java/apoc/generate/Generate.java @@ -70,7 +70,7 @@ public void complete(@Name("noNodes") Long noNodes, @Name("label") String label, @Procedure(name = "apoc.generate.simple",mode = Mode.WRITE) @Description("apoc.generate.simple(degrees, label, type) - generates a simple random graph according to the given degree distribution") public void simple(@Name("degrees") List degrees, @Name("label") String label, @Name("type") String relationshipType) throws IOException { - if (degrees == null) degrees = java.util.Arrays.asList(2L, 2L, 2L, 2L); + if (degrees == null) degrees = Arrays.asList(2L, 2L, 2L, 2L); List intDegrees = degrees.stream().map(Long::intValue).collect(Collectors.toList()); diff --git a/extended/src/main/java/apoc/load/Jdbc.java b/extended/src/main/java/apoc/load/Jdbc.java index 0d9b8b209b..302874d736 100644 --- a/extended/src/main/java/apoc/load/Jdbc.java +++ b/extended/src/main/java/apoc/load/Jdbc.java @@ -115,7 +115,7 @@ private Stream executeQuery(String urlOrKey, String tableOrSelect, Ma } } - @Procedure(mode = Mode.WRITE) + @Procedure(mode = Mode.DBMS) @Description("apoc.load.jdbcUpdate('key or url','statement',[params],config) YIELD row - update relational database, from a SQL statement with optional parameters") public Stream jdbcUpdate(@Name("jdbc") String urlOrKey, @Name("query") String query, @Name(value = "params", defaultValue = "[]") List params, @Name(value = "config",defaultValue = "{}") Map config) { log.info( String.format( "Executing SQL update: %s", query ) ); diff --git a/extended/src/main/kotlin/apoc/kafka/PublishProcedures.kt b/extended/src/main/kotlin/apoc/kafka/PublishProcedures.kt index fcc77aca3b..4cc4764c0d 100644 --- a/extended/src/main/kotlin/apoc/kafka/PublishProcedures.kt +++ b/extended/src/main/kotlin/apoc/kafka/PublishProcedures.kt @@ -1,8 +1,5 @@ package apoc.kafka -//import apoc.kafka.producer.StreamsEventRouter -//import apoc.kafka.producer.StreamsTransactionEventHandler -//import apoc.kafka.producer.StreamsTransactionEventHandler import apoc.kafka.producer.events.StreamsEventBuilder import apoc.kafka.producer.kafka.KafkaEventRouter import apoc.kafka.utils.KafkaUtil @@ -21,7 +18,6 @@ import java.util.stream.Stream data class StreamPublishResult(@JvmField val value: Map) data class StreamsEventSinkStoreEntry(val eventRouter: KafkaEventRouter, -// val txHandler: StreamsTransactionEventHandler ) class PublishProcedures { @@ -101,7 +97,6 @@ class PublishProcedures { fun register( db: GraphDatabaseAPI, evtRouter: KafkaEventRouter, -// txHandler: StreamsTransactionEventHandler ) { streamsEventRouterStore[KafkaUtil.getName(db)] = StreamsEventSinkStoreEntry(evtRouter/*, txHandler*/) } diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/StreamsEventSink.kt b/extended/src/main/kotlin/apoc/kafka/consumer/StreamsEventSink.kt index 1b9c99eaac..ab28496299 100644 --- a/extended/src/main/kotlin/apoc/kafka/consumer/StreamsEventSink.kt +++ b/extended/src/main/kotlin/apoc/kafka/consumer/StreamsEventSink.kt @@ -3,12 +3,10 @@ package apoc.kafka.consumer import apoc.kafka.consumer.kafka.KafkaEventSink import org.neo4j.kernel.internal.GraphDatabaseAPI import org.neo4j.logging.Log -import apoc.kafka.events.StreamsPluginStatus object StreamsEventSinkFactory { - fun getStreamsEventSink(config: Map, //streamsQueryExecution: StreamsEventSinkQueryExecution, - /* streamsTopicService: StreamsTopicService, */log: Log, db: GraphDatabaseAPI): KafkaEventSink { - return KafkaEventSink(/*config, streamsQueryExecution, streamsTopicService, log, */db) + fun getStreamsEventSink(config: Map, log: Log, db: GraphDatabaseAPI): KafkaEventSink { + return KafkaEventSink(db) } } diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/StreamsSinkConfigurationListener.kt b/extended/src/main/kotlin/apoc/kafka/consumer/StreamsSinkConfigurationListener.kt index 26a15395ff..21a82a86a4 100644 --- a/extended/src/main/kotlin/apoc/kafka/consumer/StreamsSinkConfigurationListener.kt +++ b/extended/src/main/kotlin/apoc/kafka/consumer/StreamsSinkConfigurationListener.kt @@ -1,57 +1,28 @@ package apoc.kafka.consumer -import apoc.kafka.config.StreamsConfig import apoc.kafka.consumer.kafka.KafkaEventSink -import apoc.kafka.consumer.kafka.KafkaSinkConfiguration import apoc.kafka.consumer.procedures.StreamsSinkProcedures -import apoc.kafka.consumer.utils.ConsumerUtils -import apoc.kafka.extensions.isDefaultDb -import apoc.kafka.utils.KafkaUtil -import apoc.kafka.utils.KafkaUtil.getProducerProperties -import kotlinx.coroutines.sync.Mutex import org.neo4j.kernel.internal.GraphDatabaseAPI import org.neo4j.logging.Log class StreamsSinkConfigurationListener(private val db: GraphDatabaseAPI, private val log: Log) { -// private val mutex = Mutex() -// var eventSink: KafkaEventSink? = null -// -// private val streamsTopicService = StreamsTopicService() -// -// private var lastConfig: KafkaSinkConfiguration? = null -// -// private val producerConfig = getProducerProperties() -// -// private fun KafkaSinkConfiguration.excludeSourceProps() = this.asProperties() -// ?.filterNot { producerConfig.contains(it.key) || it.key.toString().startsWith("apoc.kafka.source") } fun shutdown() { -// val isShuttingDown = eventSink != null -// if (isShuttingDown) { -// log.info("[Sink] Shutting down the Streams Sink Module") -// } -// eventSink?.stop() -// eventSink = null StreamsSinkProcedures.unregisterStreamsEventSink(db) -// if (isShuttingDown) { -// log.info("[Sink] Shutdown of the Streams Sink Module completed") -// } + } fun start(configMap: Map) { eventSink = StreamsEventSinkFactory .getStreamsEventSink(configMap, - // streamsQueryExecution, - // streamsTopicService, log, db) -// log.info("[Sink] Registering the Streams Sink procedures") StreamsSinkProcedures.registerStreamsEventSink(db, eventSink!!) } diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/StreamsTopicService.kt b/extended/src/main/kotlin/apoc/kafka/consumer/StreamsTopicService.kt deleted file mode 100644 index f6fae70065..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/consumer/StreamsTopicService.kt +++ /dev/null @@ -1,95 +0,0 @@ -package apoc.kafka.consumer - -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import apoc.kafka.service.TopicType -import apoc.kafka.service.Topics -import java.util.Collections -import java.util.concurrent.ConcurrentHashMap - -class StreamsTopicService { - - private val storage = ConcurrentHashMap() - - private val mutex = Mutex() - - fun clearAll() { - storage.clear() - } - - private fun throwRuntimeException(data: Any, topicType: TopicType): Unit = - throw RuntimeException("Unsupported data $data for topic type $topicType") - - fun set(topicType: TopicType, data: Any) = runBlocking { - mutex.withLock { - var oldData = storage[topicType] - oldData = oldData ?: when (data) { - is Map<*, *> -> emptyMap() - is Collection<*> -> emptyList() - else -> throwRuntimeException(data, topicType) - } - val newData = when (oldData) { - is Map<*, *> -> oldData + (data as Map) - is Collection<*> -> oldData + (data as Collection) - else -> throwRuntimeException(data, topicType) - } - storage[topicType] = newData - } - } - - fun remove(topicType: TopicType, topic: String) = runBlocking { - mutex.withLock { - val topicData = storage[topicType] ?: return@runBlocking - - val runtimeException = RuntimeException("Unsupported data $topicData for topic type $topicType") - val filteredData = when (topicData) { - is Map<*, *> -> topicData.filterKeys { it.toString() != topic } - is Collection<*> -> topicData.filter { it.toString() != topic } - else -> throw runtimeException - } - - storage[topicType] = filteredData - } - } - - fun getTopicType(topic: String) = runBlocking { - TopicType.values() - .find { - mutex.withLock { - when (val topicData = storage[it]) { - is Map<*, *> -> topicData.containsKey(topic) - is Collection<*> -> topicData.contains(topic) - else -> false - } - } - } - } - - fun getTopics() = runBlocking { - TopicType.values() - .flatMap { - mutex.withLock { - when (val data = storage[it]) { - is Map<*, *> -> data.keys - is Collection<*> -> data.toSet() - else -> emptySet() - } - } - }.toSet() as Set - } - - fun setAll(topics: Topics) { - topics.asMap().forEach { (topicType, data) -> - set(topicType, data) - } - } - - fun getCypherTemplate(topic: String) = (storage.getOrDefault(TopicType.CYPHER, emptyMap()) as Map) - .let { it[topic] } - - fun getAll(): Map = Collections.unmodifiableMap(storage) - - fun getByTopicType(topicType: TopicType): Any? = storage[topicType] - -} \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer.kt b/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer.kt index 0632bdb45d..a6591312b6 100644 --- a/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer.kt +++ b/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaAutoCommitEventConsumer.kt @@ -1,7 +1,5 @@ package apoc.kafka.consumer.kafka -//import io.confluent.kafka.serializers.KafkaAvroDeserializer -import org.apache.avro.generic.GenericRecord import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.consumer.OffsetAndMetadata @@ -53,8 +51,6 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati ErrorService.ErrorConfig.from(emptyMap()), { s, e -> log.error(s,e as Throwable) }) - // override fun invalidTopics(): List = config.sinkConfiguration.topics.invalid - private val isSeekSet = AtomicBoolean() val consumer: KafkaConsumer<*, *> = when { diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaEventSink.kt b/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaEventSink.kt index 0cfc4f2dc2..79b99099db 100644 --- a/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaEventSink.kt +++ b/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaEventSink.kt @@ -1,47 +1,23 @@ package apoc.kafka.consumer.kafka -import apoc.kafka.config.StreamsConfig import apoc.kafka.consumer.StreamsEventConsumer import apoc.kafka.consumer.StreamsEventConsumerFactory -import apoc.kafka.consumer.StreamsEventSinkQueryExecution -//import apoc.kafka.consumer.StreamsSinkConfiguration -import apoc.kafka.consumer.StreamsTopicService -import apoc.kafka.consumer.utils.ConsumerUtils -import apoc.kafka.events.StreamsPluginStatus +import apoc.kafka.events.KafkaStatus import apoc.kafka.extensions.isDefaultDb -import apoc.kafka.utils.KafkaUtil -import apoc.kafka.utils.KafkaUtil.getInvalidTopicsError -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job -import kotlinx.coroutines.cancelAndJoin -import kotlinx.coroutines.delay import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock -import org.apache.kafka.common.errors.WakeupException import org.neo4j.kernel.internal.GraphDatabaseAPI import org.neo4j.logging.Log -class KafkaEventSink(//private val config: Map, - //private val queryExecution: StreamsEventSinkQueryExecution, - // private val streamsTopicService: StreamsTopicService, - // private val log: Log, - private val db: GraphDatabaseAPI) { +class KafkaEventSink(private val db: GraphDatabaseAPI) { private val mutex = Mutex() - private lateinit var eventConsumer: KafkaEventConsumer private var job: Job? = null -// val streamsSinkConfiguration: StreamsSinkConfiguration = StreamsSinkConfiguration.from(configMap = config, -// dbName = db.databaseName(), isDefaultDb = db.isDefaultDb()) -// -// private val streamsConfig: StreamsSinkConfiguration = StreamsSinkConfiguration.from(configMap = config, -// dbName = db.databaseName(), isDefaultDb = db.isDefaultDb()) fun getEventConsumerFactory(): StreamsEventConsumerFactory { return object: StreamsEventConsumerFactory() { @@ -58,15 +34,15 @@ class KafkaEventSink(//private val config: Map, } } - fun status(): StreamsPluginStatus = runBlocking { + fun status(): KafkaStatus = runBlocking { mutex.withLock(job) { status(job) } } - private fun status(job: Job?): StreamsPluginStatus = when (job?.isActive) { - true -> StreamsPluginStatus.RUNNING - else -> StreamsPluginStatus.STOPPED + private fun status(job: Job?): KafkaStatus = when (job?.isActive) { + true -> KafkaStatus.RUNNING + else -> KafkaStatus.STOPPED } } diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaSinkConfiguration.kt b/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaSinkConfiguration.kt index 79a5252d1e..4b5387f1d0 100644 --- a/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaSinkConfiguration.kt +++ b/extended/src/main/kotlin/apoc/kafka/consumer/kafka/KafkaSinkConfiguration.kt @@ -1,40 +1,24 @@ package apoc.kafka.consumer.kafka -//import io.confluent.kafka.serializers.KafkaAvroDeserializer import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer -//import apoc.kafka.consumer.StreamsSinkConfiguration import apoc.kafka.extensions.toPointCase import apoc.kafka.utils.JSONUtils -import apoc.kafka.utils.KafkaUtil.getInvalidTopics import apoc.kafka.utils.KafkaUtil.validateConnection import java.util.Properties private const val kafkaConfigPrefix = "apoc.kafka." -//private val SUPPORTED_DESERIALIZER = listOf(ByteArrayDeserializer::class.java.name, KafkaAvroDeserializer::class.java.name) -private fun validateDeserializers(config: KafkaSinkConfiguration) { -// val key = if (!SUPPORTED_DESERIALIZER.contains(config.keyDeserializer)) { -// ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -// } else if (!SUPPORTED_DESERIALIZER.contains(config.valueDeserializer)) { -// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -// } else { -// "" -// } -// if (key.isNotBlank()) { -// throw RuntimeException("The property `kafka.$key` contains an invalid deserializer. Supported deserializers are $SUPPORTED_DESERIALIZER") -// } -} +private fun validateDeserializers(config: KafkaSinkConfiguration) {} data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092", val keyDeserializer: String = "org.apache.kafka.common.serialization.ByteArrayDeserializer", val valueDeserializer: String = "org.apache.kafka.common.serialization.ByteArrayDeserializer", val groupId: String = "neo4j", val autoOffsetReset: String = "earliest", -// val sinkConfiguration: StreamsSinkConfiguration = StreamsSinkConfiguration(), val enableAutoCommit: Boolean = true, val asyncCommit: Boolean = false, val extraProperties: Map = emptyMap()) { @@ -44,12 +28,7 @@ data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092 fun from(cfg: Map, dbName: String, isDefaultDb: Boolean): KafkaSinkConfiguration { val kafkaCfg = create(cfg, dbName, isDefaultDb) validate(kafkaCfg) -// val invalidTopics = getInvalidTopics(kafkaCfg.asProperties(), kafkaCfg.sinkConfiguration.topics.allTopics()) -// return if (invalidTopics.isNotEmpty()) { -// kafkaCfg.copy(sinkConfiguration = StreamsSinkConfiguration.from(cfg, dbName, invalidTopics, isDefaultDb)) -// } else { return kafkaCfg -// } } // Visible for testing @@ -62,8 +41,6 @@ data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092 val keys = JSONUtils.asMap(default).keys.map { it.toPointCase() } val extraProperties = config.filterKeys { !keys.contains(it) } -// val streamsSinkConfiguration = StreamsSinkConfiguration.from(configMap = cfg, dbName = dbName, isDefaultDb = isDefaultDb) - return default.copy(keyDeserializer = config.getOrDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, default.keyDeserializer), valueDeserializer = config.getOrDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, default.valueDeserializer), @@ -72,7 +49,6 @@ data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092 groupId = config.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG, default.groupId) + (if (isDefaultDb) "" else "-$dbName"), enableAutoCommit = config.getOrDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, default.enableAutoCommit).toString().toBoolean(), asyncCommit = config.getOrDefault("async.commit", default.asyncCommit).toString().toBoolean(), -// sinkConfiguration = streamsSinkConfiguration, extraProperties = extraProperties // for what we don't provide a default configuration ) } diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/procedures/QueueBasedSpliterator.kt b/extended/src/main/kotlin/apoc/kafka/consumer/procedures/QueueBasedSpliterator.kt deleted file mode 100644 index 315ae49201..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/consumer/procedures/QueueBasedSpliterator.kt +++ /dev/null @@ -1,67 +0,0 @@ -package apoc.kafka.consumer.procedures - -import org.neo4j.graphdb.NotInTransactionException -import org.neo4j.graphdb.TransactionTerminatedException -import org.neo4j.procedure.TerminationGuard -import java.util.Spliterator -import java.util.concurrent.BlockingQueue -import java.util.concurrent.TimeUnit -import java.util.function.Consumer - -/** - * @author mh - * @since 08.05.16 in APOC - */ -class QueueBasedSpliterator constructor(private val queue: BlockingQueue, - private val tombstone: T, - private val terminationGuard: TerminationGuard, - private val timeout: Long = 10) : Spliterator { - private var entry: T? - - init { - entry = poll() - } - - override fun tryAdvance(action: Consumer): Boolean { - if (transactionIsTerminated(terminationGuard)) return false - if (isEnd) return false - action.accept(entry) - entry = poll() - return !isEnd - } - - private fun transactionIsTerminated(terminationGuard: TerminationGuard): Boolean { - return try { - terminationGuard.check() - false - } catch (e: Exception) { - when (e) { - is TransactionTerminatedException, is NotInTransactionException -> true - else -> throw e - } - } - } - - private val isEnd: Boolean - private get() = entry == null || entry === tombstone - - private fun poll(): T? { - return try { - queue.poll(timeout, TimeUnit.SECONDS) - } catch (e: InterruptedException) { - null - } - } - - override fun trySplit(): Spliterator? { - return null - } - - override fun estimateSize(): Long { - return Long.MAX_VALUE - } - - override fun characteristics(): Int { - return Spliterator.NONNULL - } -} \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/consumer/procedures/StreamsSinkProcedures.kt b/extended/src/main/kotlin/apoc/kafka/consumer/procedures/StreamsSinkProcedures.kt index eb10845379..0c9cafa8c4 100644 --- a/extended/src/main/kotlin/apoc/kafka/consumer/procedures/StreamsSinkProcedures.kt +++ b/extended/src/main/kotlin/apoc/kafka/consumer/procedures/StreamsSinkProcedures.kt @@ -2,17 +2,14 @@ package apoc.kafka.consumer.procedures import apoc.kafka.config.StreamsConfig import apoc.kafka.consumer.StreamsEventConsumer -//import apoc.kafka.consumer.StreamsSinkConfiguration import apoc.kafka.consumer.kafka.KafkaEventSink -import apoc.kafka.events.StreamsPluginStatus -import apoc.kafka.extensions.isDefaultDb import apoc.kafka.utils.KafkaUtil import apoc.kafka.utils.KafkaUtil.checkEnabled +import apoc.util.QueueBasedSpliterator import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import org.apache.commons.lang3.exception.ExceptionUtils import org.neo4j.graphdb.GraphDatabaseService import org.neo4j.kernel.internal.GraphDatabaseAPI import org.neo4j.logging.Log @@ -24,7 +21,6 @@ import org.neo4j.procedure.Procedure import org.neo4j.procedure.TerminationGuard import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ConcurrentHashMap -import java.util.stream.Collectors import java.util.stream.Stream import java.util.stream.StreamSupport @@ -94,7 +90,7 @@ class StreamsSinkProcedures { log?.debug("Data retrieved from topic $topic after $timeout milliseconds: $data") } - return StreamSupport.stream(QueueBasedSpliterator(data, tombstone, terminationGuard!!, timeout), false) + return StreamSupport.stream(QueueBasedSpliterator(data, tombstone, terminationGuard, timeout.toInt()), false) } private fun createConsumer(consumerConfig: Map, topic: String): StreamsEventConsumer = runBlocking { @@ -118,8 +114,5 @@ class StreamsSinkProcedures { fun unregisterStreamsEventSink(db: GraphDatabaseAPI) = streamsEventSinkStore.remove(KafkaUtil.getName(db)) - fun hasStatus(db: GraphDatabaseAPI, status: StreamsPluginStatus) = getStreamsEventSink(db)?.status() == status - - fun isRegistered(db: GraphDatabaseAPI) = getStreamsEventSink(db) != null } } \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/events/KafkaStatus.kt b/extended/src/main/kotlin/apoc/kafka/events/KafkaStatus.kt new file mode 100644 index 0000000000..402ee14072 --- /dev/null +++ b/extended/src/main/kotlin/apoc/kafka/events/KafkaStatus.kt @@ -0,0 +1,3 @@ +package apoc.kafka.events + +enum class KafkaStatus { RUNNING, STOPPED, UNKNOWN } \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/events/StreamsPluginStatus.kt b/extended/src/main/kotlin/apoc/kafka/events/StreamsPluginStatus.kt deleted file mode 100644 index 4eb3997889..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/events/StreamsPluginStatus.kt +++ /dev/null @@ -1,3 +0,0 @@ -package apoc.kafka.events - -enum class StreamsPluginStatus { RUNNING, STOPPED, UNKNOWN } \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/extensions/CommonExtensions.kt b/extended/src/main/kotlin/apoc/kafka/extensions/CommonExtensions.kt index 4c975816da..4a6f109e7d 100644 --- a/extended/src/main/kotlin/apoc/kafka/extensions/CommonExtensions.kt +++ b/extended/src/main/kotlin/apoc/kafka/extensions/CommonExtensions.kt @@ -15,6 +15,15 @@ import java.nio.ByteBuffer import java.util.* import javax.lang.model.SourceVersion +private fun convertData(data: Any?, stringWhenFailure: Boolean = false): Any? { + return when (data) { + null -> null + is ByteArray -> JSONUtils.readValue(data, Any::class.java) + is GenericRecord -> data.toMap() + else -> if (stringWhenFailure) data.toString() else throw RuntimeException("Unsupported type ${data::class.java.name}") + } +} + fun Map.getInt(name:String, defaultValue: Int) = this.get(name)?.toInt() ?: defaultValue fun Map<*, *>.asProperties() = this.let { val properties = Properties() @@ -66,14 +75,7 @@ fun IndexedRecord.toMap() = this.schema.fields fun Schema.toMap() = JSONUtils.asMap(this.toString()) -private fun convertData(data: Any?, stringWhenFailure: Boolean = false): Any? { - return when (data) { - null -> null - is ByteArray -> JSONUtils.readValue(data, Any::class.java) - is GenericRecord -> data.toMap() - else -> if (stringWhenFailure) data.toString() else throw RuntimeException("Unsupported type ${data::class.java.name}") - } -} + fun ConsumerRecord<*, *>.toStreamsSinkEntity(): StreamsSinkEntity { val key = convertData(this.key(), true) val value = convertData(this.value()) diff --git a/extended/src/main/kotlin/apoc/kafka/producer/events/PreviousTransactionData.kt b/extended/src/main/kotlin/apoc/kafka/producer/events/PreviousTransactionData.kt deleted file mode 100644 index fe9ab224b3..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/producer/events/PreviousTransactionData.kt +++ /dev/null @@ -1,294 +0,0 @@ -package apoc.kafka.producer.events - -import apoc.kafka.events.* -import org.neo4j.graphdb.Node -import org.neo4j.graphdb.Relationship -import org.neo4j.graphdb.event.LabelEntry -import org.neo4j.graphdb.event.PropertyEntry -import apoc.kafka.extensions.labelNames -import apoc.kafka.utils.KafkaUtil.getNodeKeys - -data class PreviousNodeTransactionData(val nodeProperties: Map>, - val nodeLabels: Map>, - val updatedPayloads: List = emptyList(), - val createdPayload: List, - val deletedPayload: List) - -data class PreviousRelTransactionData(val relProperties: Map> = emptyMap(), - val updatedPayloads: List = emptyList(), - val createdPayload: List = emptyList(), - val deletedPayload: List = emptyList()) - -data class PreviousTransactionData(val nodeData: PreviousNodeTransactionData, - val relData: PreviousRelTransactionData, - val nodeConstraints: Map>, - val relConstraints: Map>) - - -/** - * Build a data class containing the previous (before) state of the nodes/relationships - */ -class PreviousTransactionDataBuilder { - - //nodes - private var nodeProperties : Map> = emptyMap() - private var nodeLabels: Map> = emptyMap() - private var updatedNodes : Set = emptySet() - private var nodeCreatedPayload: Map = emptyMap() - private var nodeDeletedPayload: Map = emptyMap() - private var deletedLabels: Map> = emptyMap() - - //relationships - private var relProperties : Map> = emptyMap() - private var updatedRels : Set = emptySet() - private var relCreatedPayload: Map = emptyMap() - private var relDeletedPayload: Map = emptyMap() - private var relRoutingTypesAndStrategies: Map = emptyMap() - - private lateinit var nodeConstraints: Map> - private lateinit var relConstraints: Map> - - fun withNodeConstraints(nodeConstraints: Map>): PreviousTransactionDataBuilder { - this.nodeConstraints = nodeConstraints - return this - } - - fun withRelConstraints(relConstraints: Map>): PreviousTransactionDataBuilder { - this.relConstraints = relConstraints - return this - } - - fun build() : PreviousTransactionData { - val createdNodeIds = nodeCreatedPayload.keys - - val updatedPayloads = updatedNodes - .filter { ! createdNodeIds.contains(it.id.toString()) } - .map { - val labelsBefore = nodeLabels.getOrDefault(it.id, it.labelNames()) - val propsBefore = nodeProperties.getOrDefault(it.id, emptyMap()) - - val beforeNode = NodeChangeBuilder() - .withLabels(labelsBefore) - .withProperties(propsBefore) - .build() - - val labelsAfter = it.labelNames() - - val afterNode = NodeChangeBuilder() - .withLabels(labelsAfter) - .withProperties(it.allProperties) - .build() - - val payload = NodePayloadBuilder() - .withId(it.id.toString()) - .withBefore(beforeNode) - .withAfter(afterNode) - .build() - - payload - } - - val nodeData = PreviousNodeTransactionData(nodeProperties, nodeLabels, - updatedPayloads, nodeCreatedPayload.values.toList(), nodeDeletedPayload.values.toList()) - - val notUpdatedRels = (relCreatedPayload.keys + relDeletedPayload.keys).toSet() - - val nodeConstraintsCache = mutableMapOf, List>() - - val updatedRelPayloads = updatedRels - .filter { ! notUpdatedRels.contains(it.id.toString()) } - .map { - val propsBefore = relProperties.getOrDefault(it.id, emptyMap()) - - val beforeNode = RelationshipChangeBuilder() - .withProperties(propsBefore) - .build() - - val afterNode = RelationshipChangeBuilder() - .withProperties(it.allProperties) - .build() - - val startLabels = it.startNode.labelNames() - val startNodeConstraints = nodeConstraintsCache.computeIfAbsent(startLabels) { - nodeConstraints - .filterKeys { startLabels.contains(it) } - .flatMap { it.value } - } - val relKeyStrategy = relRoutingTypesAndStrategies.getOrDefault(it.type.name(), - RelKeyStrategy.DEFAULT - ) - - val startNodeKeys = getNodeKeys(startLabels, it.startNode.propertyKeys.toSet(), startNodeConstraints, relKeyStrategy) - .toTypedArray() - - - val endLabels = it.endNode.labelNames() - val endNodeConstraints = nodeConstraintsCache.computeIfAbsent(endLabels) { - nodeConstraints - .filterKeys { endLabels.contains(it) } - .flatMap { it.value } - } - val endNodeKeys = getNodeKeys(endLabels, it.endNode.propertyKeys.toSet(), endNodeConstraints, relKeyStrategy) - .toTypedArray() - - val payload = RelationshipPayloadBuilder() - .withId(it.id.toString()) - .withName(it.type.name()) - .withStartNode(it.startNode.id.toString(), startLabels, it.startNode.getProperties(*startNodeKeys)) - .withEndNode(it.endNode.id.toString(), endLabels, it.endNode.getProperties(*endNodeKeys)) - .withBefore(beforeNode) - .withAfter(afterNode) - .build() - - payload - } - - val relData = PreviousRelTransactionData(createdPayload = this.relCreatedPayload.values.toList(), - deletedPayload = this.relDeletedPayload.values.toList(), - updatedPayloads = updatedRelPayloads) - - return PreviousTransactionData(nodeData = nodeData, relData = relData, nodeConstraints = nodeConstraints, relConstraints = relConstraints) - } - - fun withLabels(assignedLabels: Iterable, removedLabels: Iterable): PreviousTransactionDataBuilder { - val assignedPreviousLabels = assignedLabels - .map { labelEntry -> Pair(labelEntry.node().id, labelEntry.node().labels.filter { it != labelEntry.label() }.map { it.name() }.toList()) } // [ (nodeId, [label]) ] - .groupBy({it.first},{it.second}) // { nodeId -> [ [label] ] } - .mapValues { it.value.flatten() } // { nodeId -> [label] } - - val removedPreviousLabels = removedLabels - .map { labelEntry -> Pair(labelEntry.node().id, labelEntry.node().labelNames().toList().plus(labelEntry.label().name())) } // [ (nodeId, [label]) ] - .groupBy({it.first},{it.second}) // { nodeId -> [ [label] ] } - .mapValues { it.value.flatten() } // { nodeId -> [label] } - - - updatedNodes = updatedNodes.plus(assignedLabels - .map { it.node() } - .toSet() ) - - updatedNodes = updatedNodes.plus(removedLabels - .map { it.node() } - .toSet() ) - - nodeLabels = assignedPreviousLabels.plus(removedPreviousLabels) - - val allProps = mutableMapOf>() - updatedNodes.forEach { - allProps.putIfAbsent(it.id, it.allProperties) - } - - nodeProperties = nodeProperties.plus(allProps) - - return this - } - - fun withNodeProperties(assignedNodeProperties: Iterable>, removedNodeProperties: Iterable>): PreviousTransactionDataBuilder { - val allProps = mutableMapOf>() - assignedNodeProperties.filter { it.previouslyCommittedValue() == null } - .forEach { - var props = allProps.getOrDefault(it.entity().id, it.entity().allProperties.toMutableMap()) - props.remove(it.key()) - allProps.putIfAbsent(it.entity().id, props) - } - - assignedNodeProperties.filter { it.previouslyCommittedValue() != null } - .forEach { - var props = allProps.getOrDefault(it.entity().id, it.entity().allProperties.toMutableMap()) - props.put(it.key(), it.previouslyCommittedValue()) - allProps.putIfAbsent(it.entity().id, props) - } - - removedNodeProperties.forEach { - var props = allProps.getOrDefault(it.entity().id, it.entity().allProperties.toMutableMap()) - props.put(it.key(), it.previouslyCommittedValue()) - allProps.putIfAbsent(it.entity().id, props) - } - - updatedNodes = updatedNodes.plus(assignedNodeProperties - .map { it.entity() } - .toSet() ) - - updatedNodes = updatedNodes.plus(removedNodeProperties - .map { it.entity() } - .toSet() ) - - nodeProperties = nodeProperties.plus(allProps) - - return this - } - - fun withNodeCreatedPayloads(createdPayload: Map): PreviousTransactionDataBuilder { - this.nodeCreatedPayload = createdPayload - return this - } - - fun withNodeDeletedPayloads(deletedPayload: Map): PreviousTransactionDataBuilder { - this.nodeDeletedPayload = deletedPayload - return this - } - - fun withRelCreatedPayloads(createdPayload: Map): PreviousTransactionDataBuilder { - this.relCreatedPayload = createdPayload - return this - } - - fun withRelRoutingTypesAndStrategies(relRoutingTypesAndStrategies: Map): PreviousTransactionDataBuilder { - this.relRoutingTypesAndStrategies = relRoutingTypesAndStrategies - return this - } - - fun withRelDeletedPayloads(deletedPayload: Map): PreviousTransactionDataBuilder { - this.relDeletedPayload = deletedPayload - return this - } - - fun withRelProperties(assignedRelProperties: Iterable>, removedRelProperties: Iterable>): PreviousTransactionDataBuilder { - val allProps = mutableMapOf>() - assignedRelProperties.filter { it.previouslyCommittedValue() == null } - .forEach { - var props = allProps.getOrDefault(it.entity().id, it.entity().allProperties.toMutableMap()) - props.remove(it.key()) - allProps.putIfAbsent(it.entity().id, props) - } - - assignedRelProperties.filter { it.previouslyCommittedValue() != null } - .forEach { - var props = allProps.getOrDefault(it.entity().id, it.entity().allProperties.toMutableMap()) - props.put(it.key(), it.previouslyCommittedValue()) - allProps.putIfAbsent(it.entity().id, props) - } - - removedRelProperties.forEach { - var props = allProps.getOrDefault(it.entity().id, it.entity().allProperties.toMutableMap()) - props.put(it.key(), it.previouslyCommittedValue()) - allProps.putIfAbsent(it.entity().id, props) - } - - updatedRels = updatedRels.plus(assignedRelProperties - .map { it.entity() } - .toSet() ) - - updatedRels = updatedRels.plus(removedRelProperties - .map { it.entity() } - .toSet() ) - - relProperties = relProperties.plus(allProps) - - return this - } - - fun withDeletedLabels(deletedLabels: Map>): PreviousTransactionDataBuilder { - this.deletedLabels = deletedLabels - return this - } - - fun deletedLabels(id : Long): List{ - return this.deletedLabels.getOrDefault(id, emptyList()) - } - - fun nodeDeletedPayload(id: Long): NodePayload? { - return this.nodeDeletedPayload[id.toString()] - } - - -} \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/producer/kafka/KafkaEventRouter.kt b/extended/src/main/kotlin/apoc/kafka/producer/kafka/KafkaEventRouter.kt index c54234d53f..b09f28e6b2 100644 --- a/extended/src/main/kotlin/apoc/kafka/producer/kafka/KafkaEventRouter.kt +++ b/extended/src/main/kotlin/apoc/kafka/producer/kafka/KafkaEventRouter.kt @@ -1,7 +1,7 @@ package apoc.kafka.producer.kafka import apoc.kafka.events.StreamsEvent -import apoc.kafka.events.StreamsPluginStatus +import apoc.kafka.events.KafkaStatus import apoc.kafka.events.StreamsTransactionEvent import apoc.kafka.extensions.isDefaultDb //import apoc.kafka.producer.StreamsEventRouter @@ -11,7 +11,6 @@ import apoc.kafka.producer.asSourceRecordValue import apoc.kafka.producer.toMap import apoc.kafka.utils.JSONUtils import apoc.kafka.utils.KafkaUtil -import apoc.kafka.utils.KafkaUtil.getInvalidTopicsError import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -40,14 +39,14 @@ class KafkaEventRouter(private val config: Map, private val kafkaConfig by lazy { KafkaConfiguration.from(config, log) } private val kafkaAdminService by lazy { KafkaAdminService(kafkaConfig, log) } - private fun status(producer: Neo4jKafkaProducer<*, *>?): StreamsPluginStatus = when (producer != null) { - true -> StreamsPluginStatus.RUNNING - else -> StreamsPluginStatus.STOPPED + private fun status(producer: Neo4jKafkaProducer<*, *>?): KafkaStatus = when (producer != null) { + true -> KafkaStatus.RUNNING + else -> KafkaStatus.STOPPED } /*override*/ fun start() = runBlocking { mutex.withLock(producer) { - if (status(producer) == StreamsPluginStatus.RUNNING) { + if (status(producer) == KafkaStatus.RUNNING) { return@runBlocking } log.info("Initialising Kafka Connector") @@ -61,7 +60,7 @@ class KafkaEventRouter(private val config: Map, /*override*/ fun stop() = runBlocking { mutex.withLock(producer) { - if (status(producer) == StreamsPluginStatus.STOPPED) { + if (status(producer) == KafkaStatus.STOPPED) { return@runBlocking } KafkaUtil.ignoreExceptions({ producer?.flush() }, UninitializedPropertyAccessException::class.java)