diff --git a/extkafka/partition_attack_elect_new_leader.go b/extkafka/partition_attack_elect_new_leader.go index a40277f..871b3bf 100644 --- a/extkafka/partition_attack_elect_new_leader.go +++ b/extkafka/partition_attack_elect_new_leader.go @@ -30,8 +30,8 @@ func (f kafkaBrokerElectNewLeaderAttack) NewEmptyState() KafkaBrokerAttackState func (f kafkaBrokerElectNewLeaderAttack) Describe() action_kit_api.ActionDescription { return action_kit_api.ActionDescription{ Id: fmt.Sprintf("%s.elect-new-leader", kafkaTopicTargetId), - Label: "Trigger Partition Leader Election", - Description: "Trigger election for a new leader for a given topic and partition(s)", + Label: "Elect New Partition Leader", + Description: "Elect a new leader for a given topic and partition(s), the leader must be unavailable for the election to succeed.", Version: extbuild.GetSemverVersionStringOrUnknown(), Icon: extutil.Ptr(kafkaIcon), TargetSelection: extutil.Ptr(action_kit_api.TargetSelection{ @@ -73,6 +73,7 @@ func (f kafkaBrokerElectNewLeaderAttack) Prepare(_ context.Context, state *Kafka } func (f kafkaBrokerElectNewLeaderAttack) Start(ctx context.Context, state *KafkaBrokerAttackState) (*action_kit_api.StartResult, error) { + messages := make([]action_kit_api.Message, 0) client, err := createNewAdminClient() if err != nil { return nil, fmt.Errorf("failed to initialize kafka client: %s", err.Error()) @@ -98,20 +99,21 @@ func (f kafkaBrokerElectNewLeaderAttack) Start(ctx context.Context, state *Kafka for t, parts := range results { for partition, result := range parts { if result.Err != nil { - return nil, fmt.Errorf("error electing leader for topic '%s', partition %d: %v", - t, partition, result.Err) + messages = append(messages, action_kit_api.Message{ + Level: extutil.Ptr(action_kit_api.Warn), + Message: fmt.Sprintf("Error while electing leader for topic '%s', partition %d, error is: %s", t, partition, result.Err.Error()), + }) } else { - fmt.Printf("successfully elected leader for topic '%s', partition %d", - t, partition) + messages = append(messages, action_kit_api.Message{ + Level: extutil.Ptr(action_kit_api.Info), + Message: fmt.Sprintf("Successfully elected leader for topic '%s', partition %d", t, partition), + }) } } } return &action_kit_api.StartResult{ - Messages: &[]action_kit_api.Message{{ - Level: extutil.Ptr(action_kit_api.Info), - Message: fmt.Sprintf("Elect new leader for topic %s and partitions %s triggered", state.Topic, state.Partitions), - }}, + Messages: &messages, }, nil } diff --git a/extkafka/topic_discovery.go b/extkafka/topic_discovery.go index a859cf8..ac836d0 100644 --- a/extkafka/topic_discovery.go +++ b/extkafka/topic_discovery.go @@ -82,6 +82,13 @@ func (r *kafkaTopicDiscovery) DescribeAttributes() []discovery_kit_api.Attribute Other: "Kafka topic partitions", }, }, + { + Attribute: "kafka.topic.partitions-leaders", + Label: discovery_kit_api.PluralLabel{ + One: "Kafka topic partitions", + Other: "Kafka topic partitions", + }, + }, { Attribute: "kafka.topic.replication-factor", Label: discovery_kit_api.PluralLabel{ @@ -124,14 +131,20 @@ func toTopicTarget(topic kadm.TopicDetail) discovery_kit_api.Target { label := topic.Topic partitions := make([]string, len(topic.Partitions)) + partitionsLeaders := make([]string, len(topic.Partitions)) + + for i, partDetail := range topic.Partitions.Sorted() { + partitions[i] = strconv.FormatInt(int64(partDetail.Partition), 10) + } - for i, v := range topic.Partitions.Numbers() { - partitions[i] = strconv.FormatInt(int64(v), 10) + for i, partDetail := range topic.Partitions.Sorted() { + partitionsLeaders[i] = strconv.FormatInt(int64(partDetail.Partition), 10) + "->leader=" + strconv.FormatInt(int64(partDetail.Leader), 10) } attributes := make(map[string][]string) attributes["kafka.topic.name"] = []string{topic.Topic} attributes["kafka.topic.partitions"] = partitions + attributes["kafka.topic.partitions-leaders"] = partitionsLeaders attributes["kafka.topic.replication-factor"] = []string{fmt.Sprintf("%v", topic.Partitions.NumReplicas())} return discovery_kit_api.Target{