From 28a2c2a360e51c293057381e0e2c218899aec871 Mon Sep 17 00:00:00 2001 From: Knative Prow Robot Date: Tue, 13 Feb 2024 18:32:42 +0000 Subject: [PATCH] [release-1.12] feat: kafka broker now supports all kafka topic config options (#3687) * feat: kafka broker now supports all kafka topic config options Signed-off-by: Calum Murray * test: added unit tests, fixed existing tests Signed-off-by: Calum Murray --------- Signed-off-by: Calum Murray Co-authored-by: Calum Murray --- control-plane/pkg/kafka/topic.go | 11 +++++++++++ control-plane/pkg/kafka/topic_test.go | 22 ++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/control-plane/pkg/kafka/topic.go b/control-plane/pkg/kafka/topic.go index 71c3ab095a..c3ffc833e9 100644 --- a/control-plane/pkg/kafka/topic.go +++ b/control-plane/pkg/kafka/topic.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" "knative.dev/pkg/configmap" ) @@ -31,6 +32,7 @@ const ( DefaultTopicNumPartitionConfigMapKey = "default.topic.partitions" DefaultTopicReplicationFactorConfigMapKey = "default.topic.replication.factor" BootstrapServersConfigMapKey = "bootstrap.servers" + DefaultTopicConfigPrefix = "default.topic.config." GroupIDConfigMapKey = "group.id" @@ -97,6 +99,15 @@ func buildTopicConfigFromConfigMap(cm *corev1.ConfigMap) (*TopicConfig, error) { return nil, fmt.Errorf("failed to parse config map %s/%s: %w", cm.Namespace, cm.Name, err) } + for k, v := range cm.Data { + if s := strings.TrimPrefix(k, DefaultTopicConfigPrefix); s != k { + if topicDetail.ConfigEntries == nil { + topicDetail.ConfigEntries = make(map[string]*string) + } + topicDetail.ConfigEntries[s] = pointer.String(v) + } + } + topicDetail.ReplicationFactor = int16(replicationFactor) config := &TopicConfig{ diff --git a/control-plane/pkg/kafka/topic_test.go b/control-plane/pkg/kafka/topic_test.go index 6f485f6d29..3b27768c94 100644 --- a/control-plane/pkg/kafka/topic_test.go +++ b/control-plane/pkg/kafka/topic_test.go @@ -27,6 +27,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" eventing "knative.dev/eventing/pkg/apis/eventing/v1" kafkatesting "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/testing" @@ -441,6 +442,27 @@ func TestTopicConfigFromConfigMap(t *testing.T) { BootstrapServers: []string{"server1:9092", "server2:9092"}, }, }, + { + name: "All valid, config options provided", + data: map[string]string{ + "default.topic.partitions": "5", + "default.topic.replication.factor": "8", + "bootstrap.servers": "server1:9092, server2:9092", + "default.topic.config.retention.ms": "3600", + "default.topic.config.max.message.bytes": "68000", + }, + want: TopicConfig{ + TopicDetail: sarama.TopicDetail{ + NumPartitions: 5, + ReplicationFactor: 8, + ConfigEntries: map[string]*string{ + "retention.ms": pointer.String("3600"), + "max.message.bytes": pointer.String("68000"), + }, + }, + BootstrapServers: []string{"server1:9092", "server2:9092"}, + }, + }, { name: "Missing keys 'default.topic.partitions' - not allowed", data: map[string]string{