Skip to content

Commit

Permalink
restored stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Dec 11, 2024
1 parent 7c824bd commit 7f2eae7
Show file tree
Hide file tree
Showing 8 changed files with 452 additions and 104 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ repositories {
maven {
url "https://repo.gradle.org/gradle/libs-releases"
}
maven {
url "https://packages.confluent.io/maven"
}
mavenLocal()
}

Expand Down
3 changes: 2 additions & 1 deletion extended/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ dependencies {

compileOnly group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
compileOnly group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'

compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.8.0'

testImplementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
testImplementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'
Expand Down Expand Up @@ -179,6 +179,7 @@ dependencies {
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
testImplementation group: 'org.testcontainers', name: 'kafka', version: testContainersVersion
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1"
testImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.8.0'

configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-nop'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package apoc.kafka.consumer.kafka

import apoc.kafka.extensions.toMap
import apoc.kafka.common.support.KafkaTestUtils
import apoc.util.JsonUtil
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import kotlinx.coroutines.*
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.junit.Test
import org.neo4j.graphdb.GraphDatabaseService
import java.util.*
import kotlin.test.*

@Suppress("UNCHECKED_CAST", "DEPRECATION")
class KafkaConsumeProceduresTSE : KafkaEventSinkBaseTSE() {

private fun testProcedure(db: GraphDatabaseService, topic: String) {

val producerRecord = ProducerRecord(topic, "{\"id\": \"{${UUID.randomUUID()}}\"}", JsonUtil.writeValueAsBytes(data))
kafkaProducer.send(producerRecord).get()
db.executeTransactionally("CALL apoc.kafka.consume('$topic', {timeout: 5000}) YIELD event RETURN event", emptyMap()) { result ->
assertTrue { result.hasNext() }
val resultMap = result.next()
assertTrue { resultMap.containsKey("event") }
assertNotNull(resultMap["event"], "should contain event")
val event = resultMap["event"] as Map<String, Any?>
val resultData = event["data"] as Map<String, Any?>
assertEquals(data, resultData)
}
}

@Test
fun shouldConsumeDataFromProcedureWithSinkDisabled() {
val db = createDbWithKafkaConfigs(
"apoc.kafka.sink.enabled" to "false",
"apoc.kafka.${ConsumerConfig.GROUP_ID_CONFIG}" to "1"
)

val topic = "bar"
testProcedure(db, topic)
}

@Test
fun shouldConsumeDataFromProcedure() {
val db = createDbWithKafkaConfigs("apoc.kafka.${ConsumerConfig.GROUP_ID_CONFIG}" to "2")
val topic = "foo"
testProcedure(db, topic)
}

@Test
fun shouldTimeout() {
val db = createDbWithKafkaConfigs()
db.executeTransactionally("CALL apoc.kafka.consume('foo1', {timeout: 2000}) YIELD event RETURN event", emptyMap()) {
assertFalse { it.hasNext() }
}
}

@Test
fun shouldReadSimpleDataType() {
val db = createDbWithKafkaConfigs("apoc.kafka.${ConsumerConfig.GROUP_ID_CONFIG}" to "3")

val topic = "simple-data"
val simpleInt = 1
val simpleBoolean = true
val simpleString = "test"
var producerRecord = ProducerRecord(topic, "{\"a\":1}", JsonUtil.writeValueAsBytes(simpleInt))
kafkaProducer.send(producerRecord).get()
producerRecord = ProducerRecord(topic, "{\"a\":2}", JsonUtil.writeValueAsBytes(simpleBoolean))
kafkaProducer.send(producerRecord).get()
producerRecord = ProducerRecord(topic, "{\"a\":3}", JsonUtil.writeValueAsBytes(simpleString))
kafkaProducer.send(producerRecord).get()
db.executeTransactionally("""
CALL apoc.kafka.consume('$topic', {timeout: 5000}) YIELD event
MERGE (t:LOG{simpleData: event.data})
RETURN count(t) AS insert
""".trimIndent())
db.executeTransactionally("""
MATCH (l:LOG)
WHERE l.simpleData IN [$simpleInt, $simpleBoolean, "$simpleString"]
RETURN count(l) as count
""".trimIndent(), emptyMap()
) { searchResult ->
assertTrue { searchResult.hasNext() }
val searchResultMap = searchResult.next()
assertTrue { searchResultMap.containsKey("count") }
assertEquals(3L, searchResultMap["count"])
}
}

@Test
fun shouldReadATopicPartitionStartingFromAnOffset() = runBlocking {
val db = createDbWithKafkaConfigs()

val topic = "read-from-range"
val partition = 0
var start = -1L
(1..10).forEach {
val producerRecord = ProducerRecord(topic, partition, "{\"a\":1}", JsonUtil.writeValueAsBytes("{\"b\":${it}}"))
val recordMetadata = kafkaProducer.send(producerRecord).get()
if (it == 6) {
start = recordMetadata.offset()
}
}
delay(3000)
db.executeTransactionally("""
CALL apoc.kafka.consume('$topic', {timeout: 5000, partitions: [{partition: $partition, offset: $start}]}) YIELD event
CREATE (t:LOG{simpleData: event.data})
RETURN count(t) AS insert
""".trimIndent())

val count = db.executeTransactionally("""
MATCH (l:LOG)
RETURN count(l) as count
""".trimIndent(), emptyMap()
) {
it.columnAs<Long>("count").next()
}
assertEquals(5L, count)
}

@Test
fun shouldReadFromLatest() = runBlocking {
val db = createDbWithKafkaConfigs()

val topic = "simple-data-from-latest"
val simpleString = "test"
val partition = 0
(1..10).forEach {
val producerRecord = ProducerRecord(topic, partition, "{\"a\":${it}}", JsonUtil.writeValueAsBytes("{\"b\":${it}}"))
kafkaProducer.send(producerRecord).get()
}
delay(1000) // should ignore the three above
GlobalScope.launch(Dispatchers.IO) {
delay(1000)
val producerRecord = ProducerRecord(topic, partition, "{\"a\":1}", JsonUtil.writeValueAsBytes(simpleString))
kafkaProducer.send(producerRecord).get()
}
db.executeTransactionally("""
CALL apoc.kafka.consume('$topic', {timeout: 5000, from: 'latest', groupId: 'foo'}) YIELD event
CREATE (t:LOG{simpleData: event.data})
RETURN count(t) AS insert
""".trimIndent())
db.executeTransactionally("""
MATCH (l:LOG)
RETURN count(l) AS count
""".trimIndent(), emptyMap()
) { searchResult ->
assertTrue { searchResult.hasNext() }
val searchResultMap = searchResult.next()
assertTrue { searchResultMap.containsKey("count") }
assertEquals(1L, searchResultMap["count"])
}
Unit
}

@Test
fun shouldNotCommit() {
val db = createDbWithKafkaConfigs(
"enable.auto.commit" to false,
"apoc.kafka.${ConsumerConfig.GROUP_ID_CONFIG}" to "ajeje"
)

val topic = "simple-data"
val simpleInt = 1
val partition = 0
var producerRecord = ProducerRecord(topic, partition, "{\"a\":1}", JsonUtil.writeValueAsBytes("{\"b\":${simpleInt}}"))
kafkaProducer.send(producerRecord).get()
db.executeTransactionally("""
CALL apoc.kafka.consume('$topic', {timeout: 5000, autoCommit: false, commit:false}) YIELD event
MERGE (t:LOG{simpleData: event.data})
RETURN count(t) AS insert
""".trimIndent())
db.executeTransactionally("""
MATCH (l:LOG)
RETURN count(l) as count
""".trimIndent(), emptyMap()
) { searchResult ->
assertTrue { searchResult.hasNext() }
val searchResultMap = searchResult.next()
assertTrue { searchResultMap.containsKey("count") }
assertEquals(1L, searchResultMap["count"])
}
val kafkaConsumer = KafkaTestUtils.createConsumer<String, ByteArray>(
bootstrapServers = KafkaEventSinkSuiteIT.kafka.bootstrapServers,
schemaRegistryUrl = KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl())
val offsetAndMetadata = kafkaConsumer.committed(TopicPartition(topic, partition))
assertNull(offsetAndMetadata)
}

@Test
fun `should consume AVRO messages`() {
val db = createDbWithKafkaConfigs("apoc.kafka.${ConsumerConfig.GROUP_ID_CONFIG}" to "avroajeje")
val PLACE_SCHEMA = SchemaBuilder.builder("com.namespace")
.record("Place").fields()
.name("name").type().stringType().noDefault()
.name("coordinates").type().array().items().doubleType().noDefault()
.name("citizens").type().longType().noDefault()
.endRecord()
val coordinates = listOf(42.30000, -11.22222)
val citizens = 1_000_000L
val struct = GenericRecordBuilder(PLACE_SCHEMA)
.set("name", "Foo")
.set("coordinates", coordinates)
.set("citizens", citizens)
.build()
val topic = "avro-procedure"
val keyDeserializer = KafkaAvroDeserializer::class.java.name
val valueDeserializer = KafkaAvroDeserializer::class.java.name
kafkaAvroProducer.send(ProducerRecord(topic, null, struct)).get()
val schemaRegistryUrl = KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl()
db.executeTransactionally("""
CALL apoc.kafka.consume('$topic', {timeout: 5000, keyDeserializer: '$keyDeserializer', valueDeserializer: '$valueDeserializer', schemaRegistryUrl: '$schemaRegistryUrl'}) YIELD event
RETURN event
""".trimIndent(), emptyMap()
) { result ->
assertTrue { result.hasNext() }
val resultMap = result.next()
assertTrue { resultMap.containsKey("event") }
assertNotNull(resultMap["event"], "should contain event")
val event = resultMap["event"] as Map<String, Any?>
val resultData = event["data"] as Map<String, Any?>
assertEquals(struct.toMap(), resultData)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package apoc.kafka.consumer.kafka

import apoc.kafka.PublishProcedures
import apoc.kafka.consumer.procedures.StreamsSinkProcedures
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import apoc.kafka.common.support.KafkaTestUtils
import apoc.util.DbmsTestUtil
import apoc.util.TestUtil
import org.junit.*
import org.junit.rules.TemporaryFolder
import org.neo4j.configuration.GraphDatabaseSettings
import org.neo4j.dbms.api.DatabaseManagementService
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.api.procedure.GlobalProcedures

import apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED

open class KafkaEventSinkBaseTSE {

companion object {
private var startedFromSuite = true

lateinit var dbms: DatabaseManagementService

@BeforeClass
@BeforeAll
@JvmStatic
fun setUpContainer() {
if (!KafkaEventSinkSuiteIT.isRunning) {
startedFromSuite = false
KafkaEventSinkSuiteIT.setUpContainer()
}
}

@AfterClass
@AfterAll
@JvmStatic
fun tearDownContainer() {
if (!startedFromSuite) {
KafkaEventSinkSuiteIT.tearDownContainer()
}
}
}

@JvmField
@Rule
var temporaryFolder = TemporaryFolder()

lateinit var kafkaProducer: KafkaProducer<String, ByteArray>
lateinit var kafkaAvroProducer: KafkaProducer<GenericRecord, GenericRecord>


// Test data
val dataProperties = mapOf("prop1" to "foo", "bar" to 1)
val data = mapOf("id" to 1, "properties" to dataProperties)

@Before
@BeforeEach
fun setUp() {
kafkaProducer = KafkaTestUtils.createProducer(
bootstrapServers = KafkaEventSinkSuiteIT.kafka.bootstrapServers
)
kafkaAvroProducer = KafkaTestUtils.createProducer(
bootstrapServers = KafkaEventSinkSuiteIT.kafka.bootstrapServers,
schemaRegistryUrl = KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl(),
keySerializer = KafkaAvroSerializer::class.java.name,
valueSerializer = KafkaAvroSerializer::class.java.name)
}

fun createDbWithKafkaConfigs(vararg pairs: Pair<String, Any>) : GraphDatabaseService {
val mutableMapOf = mutableMapOf<String, Any>(
"apoc.kafka.bootstrap.servers" to KafkaEventSinkSuiteIT.kafka.bootstrapServers,
APOC_KAFKA_ENABLED to "true",
"bootstrap.servers" to KafkaEventSinkSuiteIT.kafka.bootstrapServers,
"apoc.kafka.sink.enabled" to "true"
)

mutableMapOf.putAll(mapOf(*pairs))

dbms = DbmsTestUtil.startDbWithApocConfigs(
temporaryFolder,
mutableMapOf as Map<String, Any>?
)
return getDbServices()
}

private fun <K, V> KafkaProducer<K, V>.flushAndClose() {
this.flush()
this.close()
}

@After
@AfterEach
fun tearDown() {
dbms.shutdown()

if (::kafkaProducer.isInitialized) {
kafkaProducer.flushAndClose()
}
if (::kafkaAvroProducer.isInitialized) {
kafkaAvroProducer.flushAndClose()
}
}

private fun getDbServices(): GraphDatabaseService {
val db = dbms.database(GraphDatabaseSettings.DEFAULT_DATABASE_NAME)
TestUtil.registerProcedure(db, StreamsSinkProcedures::class.java, GlobalProcedures::class.java, PublishProcedures::class.java);
return db
}
}
Loading

0 comments on commit 7f2eae7

Please sign in to comment.