diff --git a/extkafka/common.go b/extkafka/common.go index 687174b..2cc11e6 100644 --- a/extkafka/common.go +++ b/extkafka/common.go @@ -19,11 +19,9 @@ import ( ) const ( - kafkaBrokerTargetId = "com.steadybit.extension_kafka.broker" - kafkaConsumerTargetId = "com.steadybit.extension_kafka.consumer" - kafkaTopicTargetId = "com.steadybit.extension_kafka.topic" - TargetIDProducePeriodically = "com.steadybit.extension_kafka.produce.periodically" - TargetIDProduceFixedAmount = "com.steadybit.extension_kafka.produce.fixed_amount" + kafkaBrokerTargetId = "com.steadybit.extension_kafka.broker" + kafkaConsumerTargetId = "com.steadybit.extension_kafka.consumer" + kafkaTopicTargetId = "com.steadybit.extension_kafka.topic" ) const ( @@ -47,6 +45,7 @@ type KafkaBrokerAttackState struct { ExecutionID uuid.UUID RecordHeaders map[string]string ConsumerGroup string + ElectLeadersHow string } var ( diff --git a/extkafka/partition_attack_elect_new_leader.go b/extkafka/partition_attack_elect_new_leader.go index 9c25ed0..999c320 100644 --- a/extkafka/partition_attack_elect_new_leader.go +++ b/extkafka/partition_attack_elect_new_leader.go @@ -62,6 +62,25 @@ func (f kafkaBrokerElectNewLeaderAttack) Describe() action_kit_api.ActionDescrip }), Order: extutil.Ptr(2), }, + { + Name: "election_method", + Label: "How to elect the new leader, 'Elect Live Replica' elects the first life replica if there are no in-sync replicas (i.e., this is unclean leader election)", + Description: extutil.Ptr("One or more partitions to trigger a new leader election"), + Type: action_kit_api.String, + Required: extutil.Ptr(true), + Options: extutil.Ptr([]action_kit_api.ParameterOption{ + action_kit_api.ExplicitParameterOption{ + Label: "Elect Preferred Replica", + Value: "0", + }, + action_kit_api.ExplicitParameterOption{ + Label: "Elect Live Replica (needs Unclean leader enabled)", + Value: "1", + }, + }), + Order: extutil.Ptr(2), + DefaultValue: extutil.Ptr("0"), + }, }, } } @@ -69,6 +88,7 @@ func (f kafkaBrokerElectNewLeaderAttack) Describe() action_kit_api.ActionDescrip func (f kafkaBrokerElectNewLeaderAttack) Prepare(_ context.Context, state *KafkaBrokerAttackState, request action_kit_api.PrepareActionRequestBody) (*action_kit_api.PrepareResult, error) { state.Topic = extutil.MustHaveValue(request.Target.Attributes, "kafka.topic.name")[0] state.Partitions = extutil.ToStringArray(request.Config["partitions"]) + state.ElectLeadersHow = extutil.ToString(request.Config["election_method"]) return nil, nil } @@ -91,18 +111,26 @@ func (f kafkaBrokerElectNewLeaderAttack) Start(ctx context.Context, state *Kafka topicSet.Add(state.Topic, partitions...) - results, err := client.ElectLeaders(ctx, kadm.ElectPreferredReplica, topicSet) + var electionMethod int8 + if state.ElectLeadersHow == fmt.Sprintf("%d", kadm.ElectPreferredReplica) { + electionMethod = int8(kadm.ElectPreferredReplica) + } else { + electionMethod = int8(kadm.ElectLiveReplica) + } + + results, err := client.ElectLeaders(ctx, kadm.ElectLeadersHow(electionMethod), topicSet) if err != nil { return nil, fmt.Errorf("failed to elect new leader for topic %s and partitions %s: %s", state.Topic, state.Partitions, err) } - + var errorElectLeader action_kit_api.ActionKitError for t, parts := range results { for partition, result := range parts { if result.Err != nil { messages = append(messages, action_kit_api.Message{ - Level: extutil.Ptr(action_kit_api.Error), + Level: extutil.Ptr(action_kit_api.Warn), Message: fmt.Sprintf("Error while electing leader for topic '%s', partition %d, error is: %s", t, partition, result.Err.Error()), }) + errorElectLeader = action_kit_api.ActionKitError{Title: fmt.Sprintf("Election failed for partition %d", partition), Detail: extutil.Ptr(result.Err.Error())} } else { messages = append(messages, action_kit_api.Message{ Level: extutil.Ptr(action_kit_api.Info), @@ -114,6 +142,7 @@ func (f kafkaBrokerElectNewLeaderAttack) Start(ctx context.Context, state *Kafka return &action_kit_api.StartResult{ Messages: &messages, + Error: &errorElectLeader, }, nil }