Skip to content

Commit

Permalink
fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Dec 11, 2024
1 parent 5764e30 commit bfef801
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 57 deletions.
3 changes: 0 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ repositories {
maven {
url "https://repo.gradle.org/gradle/libs-releases"
}
maven {
url "https://packages.confluent.io/maven"
}
mavenLocal()
}

Expand Down
2 changes: 0 additions & 2 deletions extended/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ dependencies {

compileOnly group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
compileOnly group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'
compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.8.0'

testImplementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
testImplementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'
Expand Down Expand Up @@ -179,7 +178,6 @@ dependencies {
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
testImplementation group: 'org.testcontainers', name: 'kafka', version: testContainersVersion
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1"
testImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.8.0'

configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-nop'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati

val consumer: KafkaConsumer<*, *> = when {
config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer<ByteArray, ByteArray>(config.asProperties())
// config.keyDeserializer == ByteArrayDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer<ByteArray, GenericRecord>(config.asProperties())
// config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == KafkaAvroDeserializer::class.java.name -> KafkaConsumer<GenericRecord, GenericRecord>(config.asProperties())
// config.keyDeserializer == KafkaAvroDeserializer::class.java.name && config.valueDeserializer == ByteArrayDeserializer::class.java.name -> KafkaConsumer<GenericRecord, ByteArray>(config.asProperties())
else -> throw RuntimeException("Invalid config")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package apoc.kafka.consumer.kafka

import apoc.kafka.extensions.toMap
import apoc.kafka.common.support.KafkaTestUtils
import apoc.util.JsonUtil
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import kotlinx.coroutines.*
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
Expand All @@ -19,7 +15,7 @@ import kotlin.test.*
class KafkaConsumeProceduresTSE : KafkaEventSinkBaseTSE() {

private fun testProcedure(db: GraphDatabaseService, topic: String) {

val producerRecord = ProducerRecord(topic, "{\"id\": \"{${UUID.randomUUID()}}\"}", JsonUtil.writeValueAsBytes(data))
kafkaProducer.send(producerRecord).get()
db.executeTransactionally("CALL apoc.kafka.consume('$topic', {timeout: 5000}) YIELD event RETURN event", emptyMap()) { result ->
Expand Down Expand Up @@ -190,40 +186,4 @@ class KafkaConsumeProceduresTSE : KafkaEventSinkBaseTSE() {
val offsetAndMetadata = kafkaConsumer.committed(TopicPartition(topic, partition))
assertNull(offsetAndMetadata)
}

@Test
fun `should consume AVRO messages`() {
val db = createDbWithKafkaConfigs("apoc.kafka.${ConsumerConfig.GROUP_ID_CONFIG}" to "avroajeje")
val PLACE_SCHEMA = SchemaBuilder.builder("com.namespace")
.record("Place").fields()
.name("name").type().stringType().noDefault()
.name("coordinates").type().array().items().doubleType().noDefault()
.name("citizens").type().longType().noDefault()
.endRecord()
val coordinates = listOf(42.30000, -11.22222)
val citizens = 1_000_000L
val struct = GenericRecordBuilder(PLACE_SCHEMA)
.set("name", "Foo")
.set("coordinates", coordinates)
.set("citizens", citizens)
.build()
val topic = "avro-procedure"
val keyDeserializer = KafkaAvroDeserializer::class.java.name
val valueDeserializer = KafkaAvroDeserializer::class.java.name
kafkaAvroProducer.send(ProducerRecord(topic, null, struct)).get()
val schemaRegistryUrl = KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl()
db.executeTransactionally("""
CALL apoc.kafka.consume('$topic', {timeout: 5000, keyDeserializer: '$keyDeserializer', valueDeserializer: '$valueDeserializer', schemaRegistryUrl: '$schemaRegistryUrl'}) YIELD event
RETURN event
""".trimIndent(), emptyMap()
) { result ->
assertTrue { result.hasNext() }
val resultMap = result.next()
assertTrue { resultMap.containsKey("event") }
assertNotNull(resultMap["event"], "should contain event")
val event = resultMap["event"] as Map<String, Any?>
val resultData = event["data"] as Map<String, Any?>
assertEquals(struct.toMap(), resultData)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package apoc.kafka.consumer.kafka

import apoc.kafka.PublishProcedures
import apoc.kafka.consumer.procedures.StreamsSinkProcedures
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.junit.jupiter.api.AfterAll
Expand All @@ -20,6 +19,7 @@ import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.api.procedure.GlobalProcedures

import apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED
import org.apache.kafka.common.serialization.ByteArraySerializer

open class KafkaEventSinkBaseTSE {

Expand Down Expand Up @@ -53,7 +53,7 @@ open class KafkaEventSinkBaseTSE {
var temporaryFolder = TemporaryFolder()

lateinit var kafkaProducer: KafkaProducer<String, ByteArray>
lateinit var kafkaAvroProducer: KafkaProducer<GenericRecord, GenericRecord>
lateinit var kafkaCustomProducer: KafkaProducer<GenericRecord, GenericRecord>


// Test data
Expand All @@ -66,11 +66,11 @@ open class KafkaEventSinkBaseTSE {
kafkaProducer = KafkaTestUtils.createProducer(
bootstrapServers = KafkaEventSinkSuiteIT.kafka.bootstrapServers
)
kafkaAvroProducer = KafkaTestUtils.createProducer(
kafkaCustomProducer = KafkaTestUtils.createProducer(
bootstrapServers = KafkaEventSinkSuiteIT.kafka.bootstrapServers,
schemaRegistryUrl = KafkaEventSinkSuiteIT.schemaRegistry.getSchemaRegistryUrl(),
keySerializer = KafkaAvroSerializer::class.java.name,
valueSerializer = KafkaAvroSerializer::class.java.name)
keySerializer = ByteArraySerializer::class.java.name,
valueSerializer = ByteArraySerializer::class.java.name)
}

fun createDbWithKafkaConfigs(vararg pairs: Pair<String, Any>) : GraphDatabaseService {
Expand Down Expand Up @@ -103,8 +103,8 @@ open class KafkaEventSinkBaseTSE {
if (::kafkaProducer.isInitialized) {
kafkaProducer.flushAndClose()
}
if (::kafkaAvroProducer.isInitialized) {
kafkaAvroProducer.flushAndClose()
if (::kafkaCustomProducer.isInitialized) {
kafkaCustomProducer.flushAndClose()
}
}

Expand Down
1 change: 0 additions & 1 deletion extra-dependencies/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,4 @@ dependencies {
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion, commonExclusions
implementation group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.4.2', commonExclusions
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion, commonExclusions
implementation group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.8.0', commonExclusions
}

0 comments on commit bfef801

Please sign in to comment.