Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preview/nussknacker cloud schemaless #7212

Draft
wants to merge 16 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/integration/KafkaIntegration.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ Important thing to remember is that Kafka server addresses/Schema Registry addre
| schemaRegistryCacheConfig.parsedSchemaAccessExpirationTime | Low | duration | 2 hours | How long parsed schema will be cached after first access to it |
| schemaRegistryCacheConfig.maximumSize | Low | number | 10000 | Maximum entries size for each caches: available schemas cache and parsed schema cache |
| avroAsJsonSerialization | Low | boolean | false | Send and receive json messages described using Avro schema |
| showTopicsWithoutSchema | Low | boolean | true | Determine if all Kafka topics should be displayed or only topics with matching schema on schema registry |

### Exception handling

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import pl.touk.nussknacker.engine.schemedkafka.helpers.KafkaAvroSpecMixin
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.source.UniversalKafkaSourceFactory
import pl.touk.nussknacker.engine.schemedkafka.{AllTopicsSelectionStrategy, TopicPatternSelectionStrategy}
import pl.touk.nussknacker.engine.schemedkafka.{
TopicsMatchingPatternWithExistingSubjectsSelectionStrategy,
TopicsWithExistingSubjectSelectionStrategy
}

import java.util.regex.Pattern

Expand All @@ -21,8 +24,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
private lazy val confluentClient = schemaRegistryClientFactory.create(kafkaConfig)

test("all topic strategy test") {
val strategy = new AllTopicsSelectionStrategy()
strategy.getTopics(confluentClient).toList.map(_.toSet) shouldBe List(
val strategy = new TopicsWithExistingSubjectSelectionStrategy()
strategy.getTopics(confluentClient, kafkaConfig).toList.map(_.toSet) shouldBe List(
Set(
RecordTopic,
RecordTopicWithKey,
Expand All @@ -37,8 +40,10 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
}

test("topic filtering strategy test") {
val strategy = new TopicPatternSelectionStrategy(Pattern.compile(".*Record.*"))
strategy.getTopics(confluentClient).toList shouldBe List(List(ArrayOfRecordsTopic, RecordTopic, RecordTopicWithKey))
val strategy = new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile(".*Record.*"))
strategy.getTopics(confluentClient, kafkaConfig).toList shouldBe List(
List(ArrayOfRecordsTopic, RecordTopic, RecordTopicWithKey)
)
}

test("show how to override topic selection strategy") {
Expand All @@ -48,7 +53,8 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource
testModelDependencies,
new FlinkKafkaSourceImplFactory(None)
) {
override def topicSelectionStrategy = new TopicPatternSelectionStrategy(Pattern.compile("test-.*"))
override def topicSelectionStrategy =
new TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(Pattern.compile("test-.*"))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package pl.touk.nussknacker.defaultmodel

import io.circe.{Json, parser}
import pl.touk.nussknacker.engine.api.process.TopicName.ForSource
import pl.touk.nussknacker.engine.api.validation.ValidationMode
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.kafka.KafkaTestUtils.richConsumer
import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypes
import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion

import java.nio.charset.StandardCharsets
import java.time.Instant

class KafkaJsonItSpec extends FlinkWithKafkaSuite {

private val jsonRecord = Json.obj(
"first" -> Json.fromString("Jan"),
"middle" -> Json.fromString("Tomek"),
"last" -> Json.fromString("Kowalski")
)

test("should round-trip json message without provided schema") {

val inputTopic = "input-topic-without-schema-json"
val outputTopic = "output-topic-without-schema-json"

kafkaClient.createTopic(inputTopic, 1)
kafkaClient.createTopic(outputTopic, 1)
sendAsJson(jsonRecord.toString, ForSource(inputTopic), Instant.now.toEpochMilli)

val process =
ScenarioBuilder
.streaming("without-schema")
.parallelism(1)
.source(
"start",
"kafka",
KafkaUniversalComponentTransformer.topicParamName.value -> Expression.spel(s"'$inputTopic'"),
KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.JSON.toString}'".spel
)
.emptySink(
"end",
"kafka",
KafkaUniversalComponentTransformer.sinkKeyParamName.value -> "".spel,
KafkaUniversalComponentTransformer.sinkRawEditorParamName.value -> "true".spel,
KafkaUniversalComponentTransformer.sinkValueParamName.value -> "#input".spel,
KafkaUniversalComponentTransformer.topicParamName.value -> s"'$outputTopic'".spel,
KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.JSON.toString}'".spel,
KafkaUniversalComponentTransformer.sinkValidationModeParamName.value -> s"'${ValidationMode.lax.name}'".spel
)

run(process) {
val outputRecord = kafkaClient.createConsumer().consumeWithConsumerRecord(outputTopic).take(1).head
val parsedOutput = parser
.parse(new String(outputRecord.value(), StandardCharsets.UTF_8))
.fold(throw _, identity)

parsedOutput shouldBe jsonRecord
}
}

ignore("should round-trip plain message without provided schema") {
val inputTopic = "input-topic-without-schema-plain"
val outputTopic = "output-topic-without-schema-plain"

kafkaClient.createTopic(inputTopic, 1)
kafkaClient.createTopic(outputTopic, 1)
kafkaClient.sendRawMessage(
inputTopic,
Array.empty,
jsonRecord.toString().getBytes,
timestamp = Instant.now.toEpochMilli
)
val process =
ScenarioBuilder
.streaming("without-schema")
.parallelism(1)
.source(
"start",
"kafka",
KafkaUniversalComponentTransformer.topicParamName.value -> Expression.spel(s"'$inputTopic'"),
KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.PLAIN.toString}'".spel
)
.emptySink(
"end",
"kafka",
KafkaUniversalComponentTransformer.sinkKeyParamName.value -> "".spel,
KafkaUniversalComponentTransformer.sinkRawEditorParamName.value -> "true".spel,
KafkaUniversalComponentTransformer.sinkValueParamName.value -> "#input".spel,
KafkaUniversalComponentTransformer.topicParamName.value -> s"'$outputTopic'".spel,
KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.PLAIN.toString}'".spel,
KafkaUniversalComponentTransformer.sinkValidationModeParamName.value -> s"'${ValidationMode.lax.name}'".spel
)

run(process) {
val outputRecord = kafkaClient.createConsumer().consumeWithConsumerRecord(outputTopic).take(1).head

val parsedOutput = parser
.parse(new String(outputRecord.value(), StandardCharsets.UTF_8))
.fold(throw _, identity)

parsedOutput shouldBe jsonRecord
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ case class KafkaConfig(
avroAsJsonSerialization: Option[Boolean] = None,
kafkaAddress: Option[String] = None,
idleTimeout: Option[IdlenessConfig] = None,
sinkDeliveryGuarantee: Option[SinkDeliveryGuarantee.Value] = None
sinkDeliveryGuarantee: Option[SinkDeliveryGuarantee.Value] = None,
showTopicsWithoutSchema: Boolean = true
) {

def schemaRegistryClientKafkaConfig = SchemaRegistryClientKafkaConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object KafkaUniversalComponentTransformer {
final val sinkValueParamName = ParameterName("Value")
final val sinkValidationModeParamName = ParameterName("Value validation mode")
final val sinkRawEditorParamName = ParameterName("Raw editor")
final val contentTypeParamName = ParameterName("Content type")

def extractValidationMode(value: String): ValidationMode =
ValidationMode.fromString(value, sinkValidationModeParamName)
Expand All @@ -46,7 +47,11 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
@transient protected lazy val schemaRegistryClient: SchemaRegistryClient =
schemaRegistryClientFactory.create(kafkaConfig)

protected def topicSelectionStrategy: TopicSelectionStrategy = new AllTopicsSelectionStrategy
protected def topicSelectionStrategy: TopicSelectionStrategy = {
if (kafkaConfig.showTopicsWithoutSchema) {
new AllNonHiddenTopicsSelectionStrategy
} else new TopicsWithExistingSubjectSelectionStrategy
}

@transient protected lazy val kafkaConfig: KafkaConfig = prepareKafkaConfig

Expand All @@ -62,7 +67,7 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
protected def getTopicParam(
implicit nodeId: NodeId
): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = {
val topics = topicSelectionStrategy.getTopics(schemaRegistryClient)
val topics = topicSelectionStrategy.getTopics(schemaRegistryClient, kafkaConfig)

(topics match {
case Valid(topics) => Writer[List[ProcessCompilationError], List[UnspecializedTopicName]](Nil, topics)
Expand Down Expand Up @@ -95,18 +100,38 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
)
}

protected def getVersionParam(
protected def getVersionOrContentTypeParam(
preparedTopic: PreparedKafkaTopic[TN],
)(implicit nodeId: NodeId): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = {
val versions = schemaRegistryClient.getAllVersions(preparedTopic.prepared.toUnspecialized, isKey = false)
(versions match {
case Valid(versions) => Writer[List[ProcessCompilationError], List[Integer]](Nil, versions)
case Invalid(e) =>
Writer[List[ProcessCompilationError], List[Integer]](
List(CustomNodeError(e.getMessage, Some(topicParamName))),
Nil
)
}).map(getVersionParam)
if (schemaRegistryClient.isTopicWithSchema(
preparedTopic.prepared.topicName.toUnspecialized.name,
topicSelectionStrategy,
kafkaConfig
)) {
val versions = schemaRegistryClient.getAllVersions(preparedTopic.prepared.toUnspecialized, isKey = false)
(versions match {
case Valid(versions) => Writer[List[ProcessCompilationError], List[Integer]](Nil, versions)
case Invalid(e) =>
Writer[List[ProcessCompilationError], List[Integer]](
List(CustomNodeError(e.getMessage, Some(topicParamName))),
Nil
)
}).map(getVersionParam)
} else {
val contentTypesValues = List(
FixedExpressionValue(s"'${ContentTypes.JSON}'", s"${ContentTypes.JSON}"),
// TODO: Remove comment once plain is working correctly
// FixedExpressionValue(s"'${ContentTypes.PLAIN}'", s"${ContentTypes.PLAIN}")
)

Writer[List[ProcessCompilationError], List[FixedExpressionValue]](Nil, contentTypesValues).map(contentTypes =>
ParameterDeclaration
.mandatory[String](KafkaUniversalComponentTransformer.contentTypeParamName)
.withCreator(
modify = _.copy(editor = Some(FixedValuesParameterEditor(contentTypes)))
)
)
}
}

protected def getVersionParam(
Expand Down Expand Up @@ -189,13 +214,13 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
nextParams: List[Parameter]
)(implicit nodeId: NodeId): ContextTransformationDefinition = {
case TransformationStep((topicParamName, DefinedEagerParameter(topic: String, _)) :: Nil, _) =>
val preparedTopic = prepareTopic(topic)
val versionParam = getVersionParam(preparedTopic)
val preparedTopic = prepareTopic(topic)
val versionOrContentTypeParam = getVersionOrContentTypeParam(preparedTopic)
val topicValidationErrors =
validateTopic(preparedTopic.prepared).swap.toList.map(_.toCustomNodeError(nodeId.id, Some(topicParamName)))
NextParameters(
versionParam.value.createParameter() :: nextParams,
errors = versionParam.written ++ topicValidationErrors
versionOrContentTypeParam.value.createParameter() :: nextParams,
errors = versionOrContentTypeParam.written ++ topicValidationErrors
)
case TransformationStep((`topicParamName`, _) :: Nil, _) =>
NextParameters(parameters = fallbackVersionOptionParam.createParameter() :: nextParams)
Expand All @@ -210,5 +235,7 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid

// override it if you use other parameter name for topic
@transient protected lazy val topicParamName: ParameterName = KafkaUniversalComponentTransformer.topicParamName
@transient protected lazy val contentTypeParamName: ParameterName =
KafkaUniversalComponentTransformer.contentTypeParamName

}
Original file line number Diff line number Diff line change
@@ -1,32 +1,72 @@
package pl.touk.nussknacker.engine.schemedkafka

import cats.data.Validated
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName
import org.apache.kafka.clients.admin.ListTopicsOptions
import org.apache.kafka.common.KafkaException
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils, UnspecializedTopicName}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClient, SchemaRegistryError}

import java.util.regex.Pattern
import scala.jdk.CollectionConverters._

trait TopicSelectionStrategy extends Serializable {

def getTopics(
schemaRegistryClient: SchemaRegistryClient
schemaRegistryClient: SchemaRegistryClient,
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]]

}

class AllTopicsSelectionStrategy extends TopicSelectionStrategy {
class TopicsWithExistingSubjectSelectionStrategy extends TopicSelectionStrategy {

override def getTopics(
schemaRegistryClient: SchemaRegistryClient
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] =
schemaRegistryClient: SchemaRegistryClient,
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
schemaRegistryClient.getAllTopics
}

}

class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy {

override def getTopics(
schemaRegistryClient: SchemaRegistryClient,
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
val topicsFromSchemaRegistry = schemaRegistryClient.getAllTopics
val validatorConfig = kafkaConfig.topicsExistenceValidationConfig.validatorConfig
val schemaLessTopics: List[UnspecializedTopicName] = {
try {
KafkaUtils.usingAdminClient(kafkaConfig) {
_.listTopics(new ListTopicsOptions().timeoutMs(validatorConfig.adminClientTimeout.toMillis.toInt))
.names()
.get()
.asScala
.toSet
.map(UnspecializedTopicName.apply)
.filterNot(topic => topic.name.startsWith("_"))
.toList
}
} catch {
// In some tests we pass dummy kafka address, so when we try to get topics from kafka it fails
case _: KafkaException =>
List.empty
}
}

topicsFromSchemaRegistry.map(topics => (topics ++ schemaLessTopics).distinct)
}

}

class TopicPatternSelectionStrategy(val topicPattern: Pattern) extends TopicSelectionStrategy {
class TopicsMatchingPatternWithExistingSubjectsSelectionStrategy(val topicPattern: Pattern)
extends TopicSelectionStrategy {

override def getTopics(
schemaRegistryClient: SchemaRegistryClient
schemaRegistryClient: SchemaRegistryClient,
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] =
schemaRegistryClient.getAllTopics.map(_.filter(topic => topicPattern.matcher(topic.name).matches()))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package pl.touk.nussknacker.engine.schemedkafka.schemaregistry

import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema

object ContentTypes extends Enumeration {
type ContentType = Value

val JSON, PLAIN = Value
}

object ContentTypesSchemas {
val schemaForJson: OpenAPIJsonSchema = OpenAPIJsonSchema("{}")
val schemaForPlain: OpenAPIJsonSchema = OpenAPIJsonSchema("")
}
Loading
Loading