From 82b16ec5d83295c676736c2caf69103de0b81770 Mon Sep 17 00:00:00 2001 From: vga91 Date: Tue, 3 Sep 2024 14:22:57 +0200 Subject: [PATCH] code cleanup --- extended/build.gradle | 14 +- .../main/kotlin/apoc/kafka/KafkaHandler.kt | 2 - .../apoc/kafka/Neo4jStreamsStrategyStorage.kt | 43 ----- .../apoc/kafka/events/ProcedureResults.kt | 4 - .../kotlin/apoc/kafka/events/StreamsEvent.kt | 1 - .../producer/procedures/StreamsProcedures.kt | 117 ------------ .../sink/strategy/CypherTemplateStrategy.kt | 18 -- .../kotlin/apoc/kafka/utils/Neo4jUtils.kt | 1 - .../apoc/custom/CypherProcedureTestUtil.java | 6 +- .../custom/CypherProceduresStorageTest.java | 7 +- .../src/test/java/apoc/ml/VertexAIIT.java | 5 +- .../consumer/kafka/KafkaEventSinkAvroTSE.kt | 1 - .../consumer/kafka/KafkaEventSinkBaseTSE.kt | 7 +- .../consumer/kafka/KafkaEventSinkCommitTSE.kt | 3 - .../KafkaEventSinkNoTopicAutocreationIT.kt | 23 +-- .../consumer/kafka/KafkaEventSinkSimpleTSE.kt | 10 - .../kafka/KafkaStreamsSinkProceduresTSE.kt | 64 ------- .../consumer/kafka/SchemaRegistryContainer.kt | 4 - .../StreamsEventSinkQueryExecutionTest.kt | 73 -------- .../streams/StreamsSinkConfigurationTest.kt | 134 -------------- .../streams/StreamsTopicServiceTest.kt | 80 -------- .../kafka/KafkaSinkConfigurationTest.kt | 175 ------------------ .../KafkaEventRouterCompactionStrategyTSE.kt | 1 - .../KafkaEventRouterNoTopicAutocreationIT.kt | 70 ------- .../apoc/nlp/aws/AWSProceduresAPITest.kt | 0 25 files changed, 13 insertions(+), 850 deletions(-) delete mode 100644 extended/src/main/kotlin/apoc/kafka/Neo4jStreamsStrategyStorage.kt delete mode 100644 extended/src/main/kotlin/apoc/kafka/events/ProcedureResults.kt delete mode 100644 extended/src/main/kotlin/apoc/kafka/producer/procedures/StreamsProcedures.kt delete mode 100644 extended/src/main/kotlin/apoc/kafka/service/sink/strategy/CypherTemplateStrategy.kt delete mode 100644 extended/src/main/kotlin/apoc/kafka/utils/Neo4jUtils.kt delete mode 100644 extended/src/test/kotlin/apoc/kafka/consumer/streams/StreamsEventSinkQueryExecutionTest.kt delete mode 100644 extended/src/test/kotlin/apoc/kafka/consumer/streams/StreamsSinkConfigurationTest.kt delete mode 100644 extended/src/test/kotlin/apoc/kafka/consumer/streams/StreamsTopicServiceTest.kt delete mode 100644 extended/src/test/kotlin/apoc/kafka/consumer/streams/kafka/KafkaSinkConfigurationTest.kt delete mode 100644 extended/src/test/kotlin/apoc/kafka/producer/integrations/KafkaEventRouterNoTopicAutocreationIT.kt delete mode 100644 extended/src/test/kotlin/apoc/nlp/aws/AWSProceduresAPITest.kt diff --git a/extended/build.gradle b/extended/build.gradle index f0c444d8fc..22d8db5340 100644 --- a/extended/build.gradle +++ b/extended/build.gradle @@ -26,7 +26,7 @@ jar { } compileKotlin { - kotlinOptions.jvmTarget = JavaVersion.VERSION_17 + kotlinOptions.jvmTarget = "17" } generateGrammarSource { @@ -62,7 +62,6 @@ dependencies { exclude group: 'org.abego.treelayout' } - def kotlinVersion = "1.6.0" def kafkaVersion = "2.4.0" def jacksonVersion = "2.17.2" @@ -109,10 +108,9 @@ dependencies { } compileOnly group: 'com.couchbase.client', name: 'java-client', version: '3.3.0', withoutJacksons compileOnly group: 'io.lettuce', name: 'lettuce-core', version: '6.1.1.RELEASE' - compileOnly group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion - testImplementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion - compileOnly group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion - testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion + compileOnly group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion, withoutJacksons + compileOnly group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion, withoutJacksons + compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.270' compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-comprehend', version: '1.12.353' , withoutJacksons compileOnly group: 'com.sun.mail', name: 'javax.mail', version: '1.6.0' @@ -130,6 +128,8 @@ dependencies { compileOnly group: 'com.github.conker84', name: 'neo4j-configuration-lifecycle', version: 'ad59084711' compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.2.2' + testImplementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion + testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion testImplementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.16.1' testImplementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0' testImplementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0' @@ -159,8 +159,6 @@ dependencies { testImplementation group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1', withoutServers testImplementation group: 'com.opencsv', name: 'opencsv', version: '5.7.1' testImplementation group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.4.2' -// testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test-junit', version: kotlinVersion -// testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test-junit5', version: kotlinVersion testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test', version: '1.6.0' diff --git a/extended/src/main/kotlin/apoc/kafka/KafkaHandler.kt b/extended/src/main/kotlin/apoc/kafka/KafkaHandler.kt index 973950676b..b508410a66 100644 --- a/extended/src/main/kotlin/apoc/kafka/KafkaHandler.kt +++ b/extended/src/main/kotlin/apoc/kafka/KafkaHandler.kt @@ -21,7 +21,6 @@ class KafkaHandler(): LifecycleAdapter() { override fun start() { if(ApocConfig.apocConfig().getBoolean(APOC_KAFKA_ENABLED)) { -// println("start db......") try { StreamsRouterConfigurationListener(db, log) @@ -41,7 +40,6 @@ class KafkaHandler(): LifecycleAdapter() { override fun stop() { if(ApocConfig.apocConfig().getBoolean(APOC_KAFKA_ENABLED)) { -// println("stop db..........") StreamsRouterConfigurationListener(db, log).shutdown() StreamsSinkConfigurationListener(db, log).shutdown() diff --git a/extended/src/main/kotlin/apoc/kafka/Neo4jStreamsStrategyStorage.kt b/extended/src/main/kotlin/apoc/kafka/Neo4jStreamsStrategyStorage.kt deleted file mode 100644 index f415f5cfcd..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/Neo4jStreamsStrategyStorage.kt +++ /dev/null @@ -1,43 +0,0 @@ -//package apoc.kafka -// -//import apoc.kafka.consumer.StreamsSinkConfiguration -//import apoc.kafka.consumer.StreamsTopicService -//import apoc.kafka.extensions.isDefaultDb -//import apoc.kafka.service.StreamsStrategyStorage -//import apoc.kafka.service.TopicType -//import apoc.kafka.service.sink.strategy.* -//import org.neo4j.graphdb.GraphDatabaseService -// -//class Neo4jStreamsStrategyStorage(private val streamsTopicService: StreamsTopicService, -// private val streamsConfig: Map, -// private val db: GraphDatabaseService): StreamsStrategyStorage() { -// -// override fun getTopicType(topic: String): TopicType? { -// return streamsTopicService.getTopicType(topic) -// } -// -// private fun getTopicsByTopicType(topicType: TopicType): T = streamsTopicService.getByTopicType(topicType) as T -// -// override fun getStrategy(topic: String): IngestionStrategy = when (val topicType = getTopicType(topic)) { -// TopicType.CDC_SOURCE_ID -> { -// val strategyConfig = StreamsSinkConfiguration -// .createSourceIdIngestionStrategyConfig(streamsConfig, db.databaseName(), db.isDefaultDb()) -// SourceIdIngestionStrategy(strategyConfig) -// } -// TopicType.CDC_SCHEMA -> SchemaIngestionStrategy() -// TopicType.CUD -> CUDIngestionStrategy() -// TopicType.PATTERN_NODE -> { -// val map = getTopicsByTopicType>(topicType) -// NodePatternIngestionStrategy(map.getValue(topic)) -// } -// TopicType.PATTERN_RELATIONSHIP -> { -// val map = getTopicsByTopicType>(topicType) -// RelationshipPatternIngestionStrategy(map.getValue(topic)) -// } -// TopicType.CYPHER -> { -// CypherTemplateStrategy(streamsTopicService.getCypherTemplate(topic)!!) -// } -// else -> throw RuntimeException("Topic Type not Found") -// } -// -//} \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/events/ProcedureResults.kt b/extended/src/main/kotlin/apoc/kafka/events/ProcedureResults.kt deleted file mode 100644 index 11e0c76b89..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/events/ProcedureResults.kt +++ /dev/null @@ -1,4 +0,0 @@ -package apoc.kafka.events - -class StreamResult(@JvmField val event: Map) -class KeyValueResult(@JvmField val name: String, @JvmField val value: Any?) \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/events/StreamsEvent.kt b/extended/src/main/kotlin/apoc/kafka/events/StreamsEvent.kt index b7573a9eb6..94067cf6c7 100644 --- a/extended/src/main/kotlin/apoc/kafka/events/StreamsEvent.kt +++ b/extended/src/main/kotlin/apoc/kafka/events/StreamsEvent.kt @@ -1,6 +1,5 @@ package apoc.kafka.events -import org.neo4j.graphdb.schema.ConstraintType enum class OperationType { created, updated, deleted } diff --git a/extended/src/main/kotlin/apoc/kafka/producer/procedures/StreamsProcedures.kt b/extended/src/main/kotlin/apoc/kafka/producer/procedures/StreamsProcedures.kt deleted file mode 100644 index fb7821639d..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/producer/procedures/StreamsProcedures.kt +++ /dev/null @@ -1,117 +0,0 @@ -//package apoc.kafka.producer.procedures -// -//import kotlinx.coroutines.runBlocking -//import org.neo4j.graphdb.GraphDatabaseService -//import org.neo4j.kernel.internal.GraphDatabaseAPI -//import org.neo4j.logging.Log -//import org.neo4j.procedure.Context -//import org.neo4j.procedure.Description -//import org.neo4j.procedure.Mode -//import org.neo4j.procedure.Name -//import org.neo4j.procedure.Procedure -//import apoc.kafka.producer.StreamsEventRouter -//import apoc.kafka.producer.StreamsTransactionEventHandler -//import apoc.kafka.producer.events.StreamsEventBuilder -//import apoc.kafka.utils.StreamsUtils -//import java.util.concurrent.ConcurrentHashMap -//import java.util.stream.Stream -// -//data class StreamPublishResult(@JvmField val value: Map) -// -//data class StreamsEventSinkStoreEntry(val eventRouter: StreamsEventRouter, -// val txHandler: StreamsTransactionEventHandler -//) -//class StreamsProcedures { -// -// @JvmField @Context -// var db: GraphDatabaseService? = null -// -// @JvmField @Context var log: Log? = null -// -// @Procedure(mode = Mode.READ, name = "apoc.kafka.publish.sync") -// @Description("apoc.kafka.publish.sync(topic, payload, config) - Allows custom synchronous streaming from Neo4j to the configured stream environment") -// fun sync(@Name("topic") topic: String?, @Name("payload") payload: Any?, -// @Name(value = "config", defaultValue = "{}") config: Map?): Stream { -// checkEnabled() -// if (isTopicNullOrEmpty(topic)) { -// return Stream.empty() -// } -// checkPayloadNotNull(payload) -// -// val streamsEvent = buildStreamEvent(topic!!, payload!!) -// return getStreamsEventSinkStoreEntry().eventRouter -// .sendEventsSync(topic, listOf(streamsEvent), config ?: emptyMap()) -// .map { StreamPublishResult(it) } -// .stream() -// } -// -// @Procedure(mode = Mode.READ, name = "apoc.kafka.publish") -// @Description("apoc.kafka.publish(topic, payload, config) - Allows custom streaming from Neo4j to the configured stream environment") -// fun publish(@Name("topic") topic: String?, @Name("payload") payload: Any?, -// @Name(value = "config", defaultValue = "{}") config: Map?) = runBlocking { -// checkEnabled() -// if (isTopicNullOrEmpty(topic)) { -// return@runBlocking -// } -// checkPayloadNotNull(payload) -// -// val streamsEvent = buildStreamEvent(topic!!, payload!!) -// getStreamsEventSinkStoreEntry().eventRouter.sendEvents(topic, listOf(streamsEvent), config ?: emptyMap()) -// } -// -// private fun checkEnabled() { -// if (!getStreamsEventSinkStoreEntry().eventRouter.eventRouterConfiguration.proceduresEnabled) { -// throw RuntimeException("In order to use the procedure you must set apoc.kafka.procedures.enabled=true") -// } -// } -// -// private fun isTopicNullOrEmpty(topic: String?): Boolean { -// return if (topic.isNullOrEmpty()) { -// log?.info("Topic empty, no message sent") -// true -// } else { -// false -// } -// } -// -// private fun checkPayloadNotNull(payload: Any?) { -// if (payload == null) { -// log?.error("Payload empty, no message sent") -// throw RuntimeException("Payload may not be null") -// } -// } -// -// private fun buildStreamEvent(topic: String, payload: Any) = StreamsEventBuilder() -// .withPayload(payload) -// .withNodeRoutingConfiguration(getStreamsEventSinkStoreEntry() -// .eventRouter -// .eventRouterConfiguration -// .nodeRouting -// .firstOrNull { it.topic == topic }) -// .withRelationshipRoutingConfiguration(getStreamsEventSinkStoreEntry() -// .eventRouter -// .eventRouterConfiguration -// .relRouting -// .firstOrNull { it.topic == topic }) -// .withTopic(topic) -// .build() -// -// private fun getStreamsEventSinkStoreEntry() = streamsEventRouterStore[db!!.databaseName()]!! -// -// companion object { -// -// private val streamsEventRouterStore = ConcurrentHashMap() -// -// fun register( -// db: GraphDatabaseAPI, -// evtRouter: StreamsEventRouter, -// txHandler: StreamsTransactionEventHandler -// ) { -// streamsEventRouterStore[StreamsUtils.getName(db)] = StreamsEventSinkStoreEntry(evtRouter, txHandler) -// } -// -// fun unregister(db: GraphDatabaseAPI) { -// streamsEventRouterStore.remove(StreamsUtils.getName(db)) -// } -// } -//} diff --git a/extended/src/main/kotlin/apoc/kafka/service/sink/strategy/CypherTemplateStrategy.kt b/extended/src/main/kotlin/apoc/kafka/service/sink/strategy/CypherTemplateStrategy.kt deleted file mode 100644 index 7183cd3f6e..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/service/sink/strategy/CypherTemplateStrategy.kt +++ /dev/null @@ -1,18 +0,0 @@ -package apoc.kafka.service.sink.strategy - -import apoc.kafka.service.StreamsSinkEntity -import apoc.kafka.utils.KafkaUtil - -class CypherTemplateStrategy(query: String): IngestionStrategy { - private val fullQuery = "${KafkaUtil.UNWIND} $query" - override fun mergeNodeEvents(events: Collection): List { - return listOf(QueryEvents(fullQuery, events.mapNotNull { it.value as? Map })) - } - - override fun deleteNodeEvents(events: Collection): List = emptyList() - - override fun mergeRelationshipEvents(events: Collection): List = emptyList() - - override fun deleteRelationshipEvents(events: Collection): List = emptyList() - -} \ No newline at end of file diff --git a/extended/src/main/kotlin/apoc/kafka/utils/Neo4jUtils.kt b/extended/src/main/kotlin/apoc/kafka/utils/Neo4jUtils.kt deleted file mode 100644 index 845f3f96a9..0000000000 --- a/extended/src/main/kotlin/apoc/kafka/utils/Neo4jUtils.kt +++ /dev/null @@ -1 +0,0 @@ -package apoc.kafka.utils diff --git a/extended/src/test/java/apoc/custom/CypherProcedureTestUtil.java b/extended/src/test/java/apoc/custom/CypherProcedureTestUtil.java index 0c22ddafef..8a627996b3 100644 --- a/extended/src/test/java/apoc/custom/CypherProcedureTestUtil.java +++ b/extended/src/test/java/apoc/custom/CypherProcedureTestUtil.java @@ -9,7 +9,6 @@ import java.io.IOException; import java.util.Map; -import static apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED; import static apoc.custom.CypherProceduresHandler.*; import static apoc.util.DbmsTestUtil.startDbWithApocConfigs; import static apoc.util.SystemDbTestUtil.PROCEDURE_DEFAULT_REFRESH; @@ -21,10 +20,7 @@ public class CypherProcedureTestUtil { public final static String QUERY_CREATE = "RETURN $input1 + $input2 as answer"; public static DatabaseManagementService startDbWithCustomApocConfigs(TemporaryFolder storeDir) throws IOException { return startDbWithApocConfigs(storeDir, - Map.of( - CUSTOM_PROCEDURES_REFRESH, PROCEDURE_DEFAULT_REFRESH, - APOC_KAFKA_ENABLED, "true" - ) + Map.of(CUSTOM_PROCEDURES_REFRESH, PROCEDURE_DEFAULT_REFRESH) ); } diff --git a/extended/src/test/java/apoc/custom/CypherProceduresStorageTest.java b/extended/src/test/java/apoc/custom/CypherProceduresStorageTest.java index 2685e0ec02..40a85b8186 100644 --- a/extended/src/test/java/apoc/custom/CypherProceduresStorageTest.java +++ b/extended/src/test/java/apoc/custom/CypherProceduresStorageTest.java @@ -25,7 +25,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import static apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED; +import static apoc.custom.CypherProcedureTestUtil.QUERY_CREATE; import static apoc.custom.CypherProceduresHandler.CUSTOM_PROCEDURES_REFRESH; import static apoc.util.DbmsTestUtil.startDbWithApocConfigs; import static apoc.util.MapUtil.map; @@ -57,10 +57,7 @@ public void setUp() throws Exception { final int refreshTime = 3000; // start db with apoc.conf: `apoc.custom.procedures.refresh=