diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/ConfigurationMigrator.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/ConfigurationMigrator.kt index 13b4c2b7..ff99f406 100644 --- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/ConfigurationMigrator.kt +++ b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/ConfigurationMigrator.kt @@ -57,6 +57,8 @@ class ConfigurationMigrator(private val settings: Map) { data class PropertyConverter(val updatedConfigKey: String, val migrationHandler: () -> String) private val propertyConverterMap: Map = mutableMapOf( + // Kafka + "connector.class" to PropertyConverter("connector.class") {convertConnectorClass(settings["connector.class"] as String)}, // Common DATABASE to PropertyConverter("neo4j.database") { settings[DATABASE] as String }, SERVER_URI to PropertyConverter("neo4j.uri") { settings[SERVER_URI] as String }, @@ -74,7 +76,7 @@ class ConfigurationMigrator(private val settings: Map) { CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS to PropertyConverter("neo4j.pool.idle-time-before-connection-test") { convertMsecs(settings[CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS] as String) }, CONNECTION_POOL_MAX_SIZE to PropertyConverter("neo4j.pool.max-connection-pool-size") {settings[CONNECTION_POOL_MAX_SIZE] as String}, RETRY_BACKOFF_MSECS to PropertyConverter("neo4j.max-retry-time") { convertMsecs(settings[RETRY_BACKOFF_MSECS] as String) }, - RETRY_MAX_ATTEMPTS to PropertyConverter("neo4j.max-retry-attempts") {settings[RETRY_MAX_ATTEMPTS] as String}, + RETRY_MAX_ATTEMPTS to PropertyConverter("") {settings[RETRY_MAX_ATTEMPTS] as String}, // Sink TOPIC_CDC_SOURCE_ID to PropertyConverter("neo4j.cdc.source-id.topics") {settings[TOPIC_CDC_SOURCE_ID] as String}, TOPIC_CDC_SOURCE_ID_LABEL_NAME to PropertyConverter("neo4j.cdc.source-id.label-name") {settings[TOPIC_CDC_SOURCE_ID_LABEL_NAME] as String}, @@ -95,6 +97,14 @@ class ConfigurationMigrator(private val settings: Map) { ENFORCE_SCHEMA to PropertyConverter("") {settings[ENFORCE_SCHEMA] as String} ) + private fun convertConnectorClass(className: String): String { + return when (className) { + "streams.kafka.connect.source.Neo4jSourceConnector" -> "org.neo4j.connectors.kafka.source.Neo4jConnector" + "streams.kafka.connect.sink.Neo4jSinkConnector" -> "org.neo4j.connectors.kafka.sink.Neo4jConnector" + else -> "" + } + } + // Configuration properties that have user-defined keys private val prefixConverterMap: Map = mutableMapOf( Neo4jSinkConnectorConfig.TOPIC_PATTERN_NODE_PREFIX to "neo4j.pattern.node.topic.", @@ -136,7 +146,6 @@ class ConfigurationMigrator(private val settings: Map) { return updatedConfig } - companion object { /** * Converts milliseconds format into new format of time units diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt index fa16767e..c7e373c6 100644 --- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt +++ b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnector.kt @@ -35,8 +35,12 @@ class Neo4jSinkConnector: SinkConnector() { override fun stop() { val migratedConfig = ConfigurationMigrator(settings).migrateToV51() val mapper = ObjectMapper() - val jsonConfig = mapper.writeValueAsString(migratedConfig) - log.info("Migrated Sink configuration to v5.1 connector format: {}", jsonConfig) + val jsonConfig = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(migratedConfig) + log.info( + "The migrated settings for 5.1 version of Neo4j Sink Connector '{}' is: `{}`", + settings["name"], + jsonConfig + ) } override fun version(): String { diff --git a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceService.kt b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceService.kt index b3b6035d..863598b6 100644 --- a/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceService.kt +++ b/kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/source/Neo4jSourceService.kt @@ -175,15 +175,20 @@ class Neo4jSourceService(private val config: Neo4jSourceConnectorConfig, offsetS log.info("Error while closing Driver instance:", it) } - val migratedConfig = ConfigurationMigrator(config.originals() as Map).migrateToV51().toMutableMap() + val originalConfig = config.originals() as Map + val migratedConfig = ConfigurationMigrator(originalConfig).migrateToV51().toMutableMap() log.debug("Defaulting v5.1 migrated configuration offset to last checked timestamp: {}", lastCheck) migratedConfig["neo4j.start-from"] = "USER_PROVIDED" migratedConfig["neo4j.start-from.value"] = lastCheck val mapper = ObjectMapper() - val jsonConfig = mapper.writeValueAsString(migratedConfig) - log.info("Migrated Source configuration to v5.1 connector format: {}", jsonConfig) + val jsonConfig = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(migratedConfig) + log.info( + "The migrated settings for 5.1 version of Neo4j Source Connector '{}' is: `{}`", + originalConfig["name"], + jsonConfig + ) log.info("Neo4j Source Service closed successfully") } diff --git a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/common/ConfigurationMigratorTest.kt b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/common/ConfigurationMigratorTest.kt index d96909e7..ce379f39 100644 --- a/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/common/ConfigurationMigratorTest.kt +++ b/kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/common/ConfigurationMigratorTest.kt @@ -17,7 +17,7 @@ class ConfigurationMigratorTest { mapOf( "neo4j.topic.pattern.merge.node.properties.enabled" to "true", "neo4j.server.uri" to "neo4j+s://x.x.x.x", - "neo4j.retry.max.attemps" to "1" + "neo4j.encryption.enabled" to "false" ) // When the configuration is migrated @@ -27,7 +27,7 @@ class ConfigurationMigratorTest { assertEquals(originals.size, migratedConfig.size) assertEquals(migratedConfig["neo4j.pattern.node.merge-properties"], "true") assertEquals(migratedConfig["neo4j.uri"], "neo4j+s://x.x.x.x") - assertEquals(migratedConfig["neo4j.max-retry-attempts"], "1") + assertEquals(migratedConfig["neo4j.security.encrypted"], "false") } @Test fun `should not migrate keys with no matching configuration key`() { @@ -35,7 +35,8 @@ class ConfigurationMigratorTest { val originals = mapOf( "neo4j.encryption.ca.certificate.path" to "./cert.pem", "neo4j.source.type" to SourceType.QUERY.toString(), - "neo4j.enforce.schema" to "true" + "neo4j.enforce.schema" to "true", + "neo4j.retry.max.attemps" to "1" ) // When the configuration is migrated @@ -104,7 +105,7 @@ class ConfigurationMigratorTest { // Then those options should still be included assertEquals(originals.size, migratedConfig.size) - assertEquals(migratedConfig["connector.class"], "streams.kafka.connect.source.Neo4jSourceConnector") + assertEquals(migratedConfig["connector.class"], "org.neo4j.connectors.kafka.source.Neo4jConnector") assertEquals(migratedConfig["key.converter"], "io.confluent.connect.avro.AvroConverter") assertEquals(migratedConfig["arbitrary.config.key"], "arbitrary.value") } @@ -121,7 +122,7 @@ class ConfigurationMigratorTest { assertEquals(12, migratedConfig.size) assertEquals(migratedConfig["neo4j.query.topic"], "my-topic") - assertEquals(migratedConfig["connector.class"], "streams.kafka.connect.source.Neo4jSourceConnector") + assertEquals(migratedConfig["connector.class"], "org.neo4j.connectors.kafka.source.Neo4jConnector") assertEquals(migratedConfig["key.converter"], "io.confluent.connect.avro.AvroConverter") assertEquals(migratedConfig["key.converter.schema.registry.url"], "http://schema-registry:8081") assertEquals(migratedConfig["value.converter"], "io.confluent.connect.avro.AvroConverter") @@ -152,7 +153,7 @@ class ConfigurationMigratorTest { assertEquals(15, migratedConfig.size) assertEquals(migratedConfig["topics"], "my-topic") - assertEquals(migratedConfig["connector.class"], "streams.kafka.connect.sink.Neo4jSinkConnector") + assertEquals(migratedConfig["connector.class"], "org.neo4j.connectors.kafka.sink.Neo4jConnector") assertEquals(migratedConfig["key.converter"], "org.apache.kafka.connect.json.JsonConverter") assertEquals(migratedConfig["key.converter.schemas.enable"], "false") assertEquals(migratedConfig["value.converter"], "org.apache.kafka.connect.json.JsonConverter")