Skip to content

Commit

Permalink
removed code unused by procedures
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Oct 7, 2024
1 parent dab1549 commit 35bb9ea
Show file tree
Hide file tree
Showing 25 changed files with 97 additions and 1,358 deletions.
4 changes: 0 additions & 4 deletions extended/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ dependencies {

compileOnly group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.4.2'
compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
compileOnly group: 'com.github.conker84', name: 'neo4j-configuration-lifecycle', version: 'ad59084711'
compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.2.2'

testImplementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion
testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion
Expand Down Expand Up @@ -163,9 +161,7 @@ dependencies {
testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test', version: '1.6.0'

testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
testImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.2.2'
testImplementation group: 'org.testcontainers', name: 'kafka', version: testContainersVersion
testImplementation group: 'com.github.conker84', name: 'neo4j-configuration-lifecycle', version: 'ad59084711'

configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-nop'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public Map<String, Lifecycle> getServices(GraphDatabaseAPI db, ApocExtensionFact

serviceMap.put("cypherProcedures", cypherProcedureHandler);

if (dependencies.apocConfig().getBoolean(APOC_KAFKA_ENABLED)) {
boolean isKafkaEnabled = dependencies.apocConfig().getConfig().getBoolean(APOC_KAFKA_ENABLED, false);
if (isKafkaEnabled) {
try {
Class<?> kafkaHandlerClass = Class.forName("apoc.kafka.KafkaHandler");
Lifecycle kafkaHandler = (Lifecycle) kafkaHandlerClass
Expand Down
15 changes: 8 additions & 7 deletions extended/src/main/kotlin/apoc/kafka/PublishProcedures.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package apoc.kafka

import apoc.kafka.producer.StreamsEventRouter
import apoc.kafka.producer.StreamsTransactionEventHandler
//import apoc.kafka.producer.StreamsEventRouter
//import apoc.kafka.producer.StreamsTransactionEventHandler
//import apoc.kafka.producer.StreamsTransactionEventHandler
import apoc.kafka.producer.events.StreamsEventBuilder
import apoc.kafka.producer.kafka.KafkaEventRouter
import apoc.kafka.utils.KafkaUtil
import apoc.kafka.utils.KafkaUtil.checkEnabled
import kotlinx.coroutines.runBlocking
Expand All @@ -19,8 +20,8 @@ import java.util.stream.Stream

data class StreamPublishResult(@JvmField val value: Map<String, Any>)

data class StreamsEventSinkStoreEntry(val eventRouter: StreamsEventRouter,
val txHandler: StreamsTransactionEventHandler
data class StreamsEventSinkStoreEntry(val eventRouter: KafkaEventRouter,
// val txHandler: StreamsTransactionEventHandler
)
class PublishProcedures {

Expand Down Expand Up @@ -99,10 +100,10 @@ class PublishProcedures {

fun register(
db: GraphDatabaseAPI,
evtRouter: StreamsEventRouter,
txHandler: StreamsTransactionEventHandler
evtRouter: KafkaEventRouter,
// txHandler: StreamsTransactionEventHandler
) {
streamsEventRouterStore[KafkaUtil.getName(db)] = StreamsEventSinkStoreEntry(evtRouter, txHandler)
streamsEventRouterStore[KafkaUtil.getName(db)] = StreamsEventSinkStoreEntry(evtRouter/*, txHandler*/)
}

fun unregister(db: GraphDatabaseAPI) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ import apoc.kafka.events.StreamsPluginStatus
object StreamsEventSinkFactory {
fun getStreamsEventSink(config: Map<String, String>, //streamsQueryExecution: StreamsEventSinkQueryExecution,
/* streamsTopicService: StreamsTopicService, */log: Log, db: GraphDatabaseAPI): KafkaEventSink {
// return Class.forName(config.getOrDefault("apoc.kafka.sink", "apoc.kafka.consumer.kafka.KafkaEventSink"))
// .getConstructor(Map::class.java,
// StreamsEventSinkQueryExecution::class.java,
// StreamsTopicService::class.java,
// Log::class.java,
// GraphDatabaseAPI::class.java)
// .newInstance(config, streamsQueryExecution, streamsTopicService, log, db)
return KafkaEventSink(/*config, streamsQueryExecution, streamsTopicService, log, */db)
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,58 +43,16 @@ class StreamsSinkConfigurationListener(private val db: GraphDatabaseAPI,
}

fun start(configMap: Map<String, String>) {
// lastConfig = KafkaSinkConfiguration.create(StreamsConfig.getConfiguration(), db.databaseName(), db.isDefaultDb())
// val streamsSinkConfiguration = lastConfig!!.sinkConfiguration
// streamsTopicService.clearAll()
// streamsTopicService.setAll(streamsSinkConfiguration.topics)
//
// val neo4jStrategyStorage = Neo4jStreamsStrategyStorage(streamsTopicService, configMap, db)
// val streamsQueryExecution = StreamsEventSinkQueryExecution(db,
// log, neo4jStrategyStorage)
//

eventSink = StreamsEventSinkFactory
.getStreamsEventSink(configMap,
// streamsQueryExecution,
// streamsTopicService,
log,
db)
// try {
// if (streamsSinkConfiguration.enabled) {
// log.info("[Sink] The Streams Sink module is starting")
// if (KafkaUtil.isCluster(db)) {
// initSinkModule(streamsSinkConfiguration)
// } else {
// runInASingleInstance(streamsSinkConfiguration)
// }
// }
// } catch (e: Exception) {
// log.warn("Cannot start the Streams Sink module because the following exception", e)
// }
//

// log.info("[Sink] Registering the Streams Sink procedures")
StreamsSinkProcedures.registerStreamsEventSink(db, eventSink!!)
}

// private fun initSink() {
// eventSink?.start()
// eventSink?.printInvalidTopics()
// }
//
// private fun runInASingleInstance(streamsSinkConfiguration: StreamsSinkConfiguration) {
// // check if is writeable instance
// ConsumerUtils.executeInWriteableInstance(db) {
// if (streamsSinkConfiguration.clusterOnly) {
// log.info("""
// |Cannot init the Streams Sink module as is forced to work only in a cluster env,
// |please check the value of `${StreamsConfig.CLUSTER_ONLY}`
// """.trimMargin())
// } else {
// initSinkModule(streamsSinkConfiguration)
// }
// }
// }
//
// private fun initSinkModule(streamsSinkConfiguration: StreamsSinkConfiguration) {
// initSink()
// }

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package apoc.kafka.consumer.kafka

import io.confluent.kafka.serializers.KafkaAvroDeserializer
//import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
Expand Down Expand Up @@ -59,9 +59,9 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati

val consumer: KafkaConsumer<*, *> = when {
config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer<ByteArray, ByteArray>(config.asProperties())
config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer<ByteArray, GenericRecord>(config.asProperties())
config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer<GenericRecord, GenericRecord>(config.asProperties())
config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer<GenericRecord, ByteArray>(config.asProperties())
// config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer<ByteArray, GenericRecord>(config.asProperties())
// config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer<GenericRecord, GenericRecord>(config.asProperties())
// config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer<GenericRecord, ByteArray>(config.asProperties())
else -> throw RuntimeException("Invalid config")
}

Expand Down
Loading

0 comments on commit 35bb9ea

Please sign in to comment.