From 352e31168e8c9d9606f6b0f82c10838ef3c479dd Mon Sep 17 00:00:00 2001 From: "antoine.choimet" Date: Thu, 21 Nov 2024 12:35:33 +0100 Subject: [PATCH] fix: add partitions attributes --- extkafka/partition_attack_elect_new_leader.go | 2 +- extkafka/topic_discovery.go | 28 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/extkafka/partition_attack_elect_new_leader.go b/extkafka/partition_attack_elect_new_leader.go index 2bcc565..9c25ed0 100644 --- a/extkafka/partition_attack_elect_new_leader.go +++ b/extkafka/partition_attack_elect_new_leader.go @@ -91,7 +91,7 @@ func (f kafkaBrokerElectNewLeaderAttack) Start(ctx context.Context, state *Kafka topicSet.Add(state.Topic, partitions...) - results, err := client.ElectLeaders(ctx, kadm.ElectLiveReplica, topicSet) + results, err := client.ElectLeaders(ctx, kadm.ElectPreferredReplica, topicSet) if err != nil { return nil, fmt.Errorf("failed to elect new leader for topic %s and partitions %s: %s", state.Topic, state.Partitions, err) } diff --git a/extkafka/topic_discovery.go b/extkafka/topic_discovery.go index 62181fb..b22ae14 100644 --- a/extkafka/topic_discovery.go +++ b/extkafka/topic_discovery.go @@ -54,6 +54,8 @@ func (r *kafkaTopicDiscovery) DescribeTarget() discovery_kit_api.TargetDescripti {Attribute: "steadybit.label"}, {Attribute: "kafka.topic.name"}, {Attribute: "kafka.topic.partitions-leaders"}, + {Attribute: "kafka.topic.partitions-replicas"}, + {Attribute: "kafka.topic.partitions-isr"}, {Attribute: "kafka.topic.replication-factor"}, }, OrderBy: []discovery_kit_api.OrderBy{ @@ -89,6 +91,20 @@ func (r *kafkaTopicDiscovery) DescribeAttributes() []discovery_kit_api.Attribute Other: "Kafka topic partitions leaders", }, }, + { + Attribute: "kafka.topic.partitions-replicas", + Label: discovery_kit_api.PluralLabel{ + One: "Kafka topic partitions replicas", + Other: "Kafka topic partitions replicas", + }, + }, + { + Attribute: "kafka.topic.partitions-isr", + Label: discovery_kit_api.PluralLabel{ + One: "Kafka topic partitions in-sync-replicas", + Other: "Kafka topic partitions in-sync-replicas", + }, + }, { Attribute: "kafka.topic.replication-factor", Label: discovery_kit_api.PluralLabel{ @@ -132,6 +148,8 @@ func toTopicTarget(topic kadm.TopicDetail) discovery_kit_api.Target { partitions := make([]string, len(topic.Partitions)) partitionsLeaders := make([]string, len(topic.Partitions)) + partitionsReplicas := make([]string, len(topic.Partitions)) + partitionsInSyncReplicas := make([]string, len(topic.Partitions)) for i, partDetail := range topic.Partitions.Sorted() { partitions[i] = strconv.FormatInt(int64(partDetail.Partition), 10) @@ -141,10 +159,20 @@ func toTopicTarget(topic kadm.TopicDetail) discovery_kit_api.Target { partitionsLeaders[i] = strconv.FormatInt(int64(partDetail.Partition), 10) + "->leader=" + strconv.FormatInt(int64(partDetail.Leader), 10) } + for i, partDetail := range topic.Partitions.Sorted() { + partitionsReplicas[i] = strconv.FormatInt(int64(partDetail.Partition), 10) + "->replicas=" + fmt.Sprintf("%v", partDetail.Replicas) + } + + for i, partDetail := range topic.Partitions.Sorted() { + partitionsInSyncReplicas[i] = strconv.FormatInt(int64(partDetail.Partition), 10) + "->in-sync-replicas=" + fmt.Sprintf("%v", partDetail.ISR) + } + attributes := make(map[string][]string) attributes["kafka.topic.name"] = []string{topic.Topic} attributes["kafka.topic.partitions"] = partitions attributes["kafka.topic.partitions-leaders"] = partitionsLeaders + attributes["kafka.topic.partitions-replicas"] = partitionsReplicas + attributes["kafka.topic.partitions-isr"] = partitionsReplicas attributes["kafka.topic.replication-factor"] = []string{fmt.Sprintf("%v", topic.Partitions.NumReplicas())} return discovery_kit_api.Target{