Skip to content

Commit

Permalink
fix: add leader info to partitions on topic target, fix error handlin…
Browse files Browse the repository at this point in the history
…g of elect new leader attack
  • Loading branch information
achoimet committed Nov 21, 2024
1 parent 4e2bf11 commit 29d1500
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 12 deletions.
22 changes: 12 additions & 10 deletions extkafka/partition_attack_elect_new_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func (f kafkaBrokerElectNewLeaderAttack) NewEmptyState() KafkaBrokerAttackState
func (f kafkaBrokerElectNewLeaderAttack) Describe() action_kit_api.ActionDescription {
return action_kit_api.ActionDescription{
Id: fmt.Sprintf("%s.elect-new-leader", kafkaTopicTargetId),
Label: "Trigger Partition Leader Election",
Description: "Trigger election for a new leader for a given topic and partition(s)",
Label: "Elect New Partition Leader",
Description: "Elect a new leader for a given topic and partition(s), the leader must be unavailable for the election to succeed.",
Version: extbuild.GetSemverVersionStringOrUnknown(),
Icon: extutil.Ptr(kafkaIcon),
TargetSelection: extutil.Ptr(action_kit_api.TargetSelection{
Expand Down Expand Up @@ -73,6 +73,7 @@ func (f kafkaBrokerElectNewLeaderAttack) Prepare(_ context.Context, state *Kafka
}

func (f kafkaBrokerElectNewLeaderAttack) Start(ctx context.Context, state *KafkaBrokerAttackState) (*action_kit_api.StartResult, error) {
messages := make([]action_kit_api.Message, 0)
client, err := createNewAdminClient()
if err != nil {
return nil, fmt.Errorf("failed to initialize kafka client: %s", err.Error())
Expand All @@ -98,20 +99,21 @@ func (f kafkaBrokerElectNewLeaderAttack) Start(ctx context.Context, state *Kafka
for t, parts := range results {
for partition, result := range parts {
if result.Err != nil {
return nil, fmt.Errorf("error electing leader for topic '%s', partition %d: %v",
t, partition, result.Err)
messages = append(messages, action_kit_api.Message{
Level: extutil.Ptr(action_kit_api.Warn),
Message: fmt.Sprintf("Error while electing leader for topic '%s', partition %d, error is: %s", t, partition, result.Err.Error()),
})
} else {
fmt.Printf("successfully elected leader for topic '%s', partition %d",
t, partition)
messages = append(messages, action_kit_api.Message{
Level: extutil.Ptr(action_kit_api.Info),
Message: fmt.Sprintf("Successfully elected leader for topic '%s', partition %d", t, partition),
})
}
}
}

return &action_kit_api.StartResult{
Messages: &[]action_kit_api.Message{{
Level: extutil.Ptr(action_kit_api.Info),
Message: fmt.Sprintf("Elect new leader for topic %s and partitions %s triggered", state.Topic, state.Partitions),
}},
Messages: &messages,
}, nil

}
Expand Down
17 changes: 15 additions & 2 deletions extkafka/topic_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func (r *kafkaTopicDiscovery) DescribeAttributes() []discovery_kit_api.Attribute
Other: "Kafka topic partitions",
},
},
{
Attribute: "kafka.topic.partitions-leaders",
Label: discovery_kit_api.PluralLabel{
One: "Kafka topic partitions",
Other: "Kafka topic partitions",
},
},
{
Attribute: "kafka.topic.replication-factor",
Label: discovery_kit_api.PluralLabel{
Expand Down Expand Up @@ -124,14 +131,20 @@ func toTopicTarget(topic kadm.TopicDetail) discovery_kit_api.Target {
label := topic.Topic

partitions := make([]string, len(topic.Partitions))
partitionsLeaders := make([]string, len(topic.Partitions))

for i, partDetail := range topic.Partitions.Sorted() {
partitions[i] = strconv.FormatInt(int64(partDetail.Partition), 10)
}

for i, v := range topic.Partitions.Numbers() {
partitions[i] = strconv.FormatInt(int64(v), 10)
for i, partDetail := range topic.Partitions.Sorted() {
partitionsLeaders[i] = strconv.FormatInt(int64(partDetail.Partition), 10) + "->leader=" + strconv.FormatInt(int64(partDetail.Leader), 10)
}

attributes := make(map[string][]string)
attributes["kafka.topic.name"] = []string{topic.Topic}
attributes["kafka.topic.partitions"] = partitions
attributes["kafka.topic.partitions-leaders"] = partitionsLeaders
attributes["kafka.topic.replication-factor"] = []string{fmt.Sprintf("%v", topic.Partitions.NumReplicas())}

return discovery_kit_api.Target{
Expand Down

0 comments on commit 29d1500

Please sign in to comment.