Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update configuration migration properties #650

Merged
merged 6 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class ConfigurationMigrator(private val settings: Map<String, String>) {
data class PropertyConverter(val updatedConfigKey: String, val migrationHandler: () -> String)

private val propertyConverterMap: Map<String, PropertyConverter> = mutableMapOf(
// Kafka
"connector.class" to PropertyConverter("connector.class") {convertConnectorClass(settings["connector.class"] as String)},
// Common
DATABASE to PropertyConverter("neo4j.database") { settings[DATABASE] as String },
SERVER_URI to PropertyConverter("neo4j.uri") { settings[SERVER_URI] as String },
Expand All @@ -74,7 +76,7 @@ class ConfigurationMigrator(private val settings: Map<String, String>) {
CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS to PropertyConverter("neo4j.pool.idle-time-before-connection-test") { convertMsecs(settings[CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS] as String) },
CONNECTION_POOL_MAX_SIZE to PropertyConverter("neo4j.pool.max-connection-pool-size") {settings[CONNECTION_POOL_MAX_SIZE] as String},
RETRY_BACKOFF_MSECS to PropertyConverter("neo4j.max-retry-time") { convertMsecs(settings[RETRY_BACKOFF_MSECS] as String) },
RETRY_MAX_ATTEMPTS to PropertyConverter("neo4j.max-retry-attempts") {settings[RETRY_MAX_ATTEMPTS] as String},
RETRY_MAX_ATTEMPTS to PropertyConverter("") {settings[RETRY_MAX_ATTEMPTS] as String},
// Sink
TOPIC_CDC_SOURCE_ID to PropertyConverter("neo4j.cdc.source-id.topics") {settings[TOPIC_CDC_SOURCE_ID] as String},
TOPIC_CDC_SOURCE_ID_LABEL_NAME to PropertyConverter("neo4j.cdc.source-id.label-name") {settings[TOPIC_CDC_SOURCE_ID_LABEL_NAME] as String},
Expand All @@ -95,6 +97,14 @@ class ConfigurationMigrator(private val settings: Map<String, String>) {
ENFORCE_SCHEMA to PropertyConverter("") {settings[ENFORCE_SCHEMA] as String}
)

private fun convertConnectorClass(className: String): String {
return when (className) {
"streams.kafka.connect.source.Neo4jSourceConnector" -> "org.neo4j.connectors.kafka.source.Neo4jConnector"
"streams.kafka.connect.sink.Neo4jSinkConnector" -> "org.neo4j.connectors.kafka.sink.Neo4jConnector"
else -> ""
}
}

// Configuration properties that have user-defined keys
private val prefixConverterMap: Map<String, String> = mutableMapOf(
Neo4jSinkConnectorConfig.TOPIC_PATTERN_NODE_PREFIX to "neo4j.pattern.node.topic.",
Expand Down Expand Up @@ -136,7 +146,6 @@ class ConfigurationMigrator(private val settings: Map<String, String>) {

return updatedConfig
}

companion object {
/**
* Converts milliseconds format into new format of time units
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Neo4jSinkConnector: SinkConnector() {
val migratedConfig = ConfigurationMigrator(settings).migrateToV51()
val mapper = ObjectMapper()
val jsonConfig = mapper.writeValueAsString(migratedConfig)
log.info("Migrated Sink configuration to v5.1 connector format: {}", jsonConfig)
log.info("Migrated Neo4j Sink Connector '{}' configuration to v5.1 connector format: {}", settings["name"], jsonConfig)
dhrudevalia marked this conversation as resolved.
Show resolved Hide resolved
}

override fun version(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,16 @@ class Neo4jSourceService(private val config: Neo4jSourceConnectorConfig, offsetS
log.info("Error while closing Driver instance:", it)
}

val migratedConfig = ConfigurationMigrator(config.originals() as Map<String, String>).migrateToV51().toMutableMap()
val originalConfig = config.originals() as Map<String, String>
val migratedConfig = ConfigurationMigrator(originalConfig).migrateToV51().toMutableMap()

log.debug("Defaulting v5.1 migrated configuration offset to last checked timestamp: {}", lastCheck)
migratedConfig["neo4j.start-from"] = "USER_PROVIDED"
migratedConfig["neo4j.start-from.value"] = lastCheck

val mapper = ObjectMapper()
val jsonConfig = mapper.writeValueAsString(migratedConfig)
log.info("Migrated Source configuration to v5.1 connector format: {}", jsonConfig)
log.info("Migrated Neo4j Source Connector '{}' configuration to v5.1 connector format: {}", originalConfig["name"], jsonConfig)
dhrudevalia marked this conversation as resolved.
Show resolved Hide resolved

log.info("Neo4j Source Service closed successfully")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ConfigurationMigratorTest {
mapOf(
"neo4j.topic.pattern.merge.node.properties.enabled" to "true",
"neo4j.server.uri" to "neo4j+s://x.x.x.x",
"neo4j.retry.max.attemps" to "1"
"neo4j.encryption.enabled" to "false"
)

// When the configuration is migrated
Expand All @@ -27,15 +27,16 @@ class ConfigurationMigratorTest {
assertEquals(originals.size, migratedConfig.size)
assertEquals(migratedConfig["neo4j.pattern.node.merge-properties"], "true")
assertEquals(migratedConfig["neo4j.uri"], "neo4j+s://x.x.x.x")
assertEquals(migratedConfig["neo4j.max-retry-attempts"], "1")
assertEquals(migratedConfig["neo4j.security.encrypted"], "false")
}

@Test fun `should not migrate keys with no matching configuration key`() {
// Given a configuration which has no equivalent in the updated connector
val originals = mapOf(
"neo4j.encryption.ca.certificate.path" to "./cert.pem",
"neo4j.source.type" to SourceType.QUERY.toString(),
"neo4j.enforce.schema" to "true"
"neo4j.enforce.schema" to "true",
"neo4j.retry.max.attemps" to "1"
)

// When the configuration is migrated
Expand Down Expand Up @@ -104,7 +105,7 @@ class ConfigurationMigratorTest {

// Then those options should still be included
assertEquals(originals.size, migratedConfig.size)
assertEquals(migratedConfig["connector.class"], "streams.kafka.connect.source.Neo4jSourceConnector")
assertEquals(migratedConfig["connector.class"], "org.neo4j.connectors.kafka.source.Neo4jConnector")
assertEquals(migratedConfig["key.converter"], "io.confluent.connect.avro.AvroConverter")
assertEquals(migratedConfig["arbitrary.config.key"], "arbitrary.value")
}
Expand All @@ -121,7 +122,7 @@ class ConfigurationMigratorTest {
assertEquals(12, migratedConfig.size)

assertEquals(migratedConfig["neo4j.query.topic"], "my-topic")
assertEquals(migratedConfig["connector.class"], "streams.kafka.connect.source.Neo4jSourceConnector")
assertEquals(migratedConfig["connector.class"], "org.neo4j.connectors.kafka.source.Neo4jConnector")
assertEquals(migratedConfig["key.converter"], "io.confluent.connect.avro.AvroConverter")
assertEquals(migratedConfig["key.converter.schema.registry.url"], "http://schema-registry:8081")
assertEquals(migratedConfig["value.converter"], "io.confluent.connect.avro.AvroConverter")
Expand Down Expand Up @@ -152,7 +153,7 @@ class ConfigurationMigratorTest {
assertEquals(15, migratedConfig.size)

assertEquals(migratedConfig["topics"], "my-topic")
assertEquals(migratedConfig["connector.class"], "streams.kafka.connect.sink.Neo4jSinkConnector")
assertEquals(migratedConfig["connector.class"], "org.neo4j.connectors.kafka.sink.Neo4jConnector")
assertEquals(migratedConfig["key.converter"], "org.apache.kafka.connect.json.JsonConverter")
assertEquals(migratedConfig["key.converter.schemas.enable"], "false")
assertEquals(migratedConfig["value.converter"], "org.apache.kafka.connect.json.JsonConverter")
Expand Down