Skip to content

Commit

Permalink
feat: add election method to action
Browse files Browse the repository at this point in the history
  • Loading branch information
achoimet committed Nov 21, 2024
1 parent 352e311 commit 4632685
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
9 changes: 4 additions & 5 deletions extkafka/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
)

const (
kafkaBrokerTargetId = "com.steadybit.extension_kafka.broker"
kafkaConsumerTargetId = "com.steadybit.extension_kafka.consumer"
kafkaTopicTargetId = "com.steadybit.extension_kafka.topic"
TargetIDProducePeriodically = "com.steadybit.extension_kafka.produce.periodically"
TargetIDProduceFixedAmount = "com.steadybit.extension_kafka.produce.fixed_amount"
kafkaBrokerTargetId = "com.steadybit.extension_kafka.broker"
kafkaConsumerTargetId = "com.steadybit.extension_kafka.consumer"
kafkaTopicTargetId = "com.steadybit.extension_kafka.topic"
)

const (
Expand All @@ -47,6 +45,7 @@ type KafkaBrokerAttackState struct {
ExecutionID uuid.UUID
RecordHeaders map[string]string
ConsumerGroup string
ElectLeadersHow string
}

var (
Expand Down
35 changes: 32 additions & 3 deletions extkafka/partition_attack_elect_new_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,33 @@ func (f kafkaBrokerElectNewLeaderAttack) Describe() action_kit_api.ActionDescrip
}),
Order: extutil.Ptr(2),
},
{
Name: "election_method",
Label: "How to elect the new leader, 'Elect Live Replica' elects the first life replica if there are no in-sync replicas (i.e., this is unclean leader election)",
Description: extutil.Ptr("One or more partitions to trigger a new leader election"),
Type: action_kit_api.String,
Required: extutil.Ptr(true),
Options: extutil.Ptr([]action_kit_api.ParameterOption{
action_kit_api.ExplicitParameterOption{
Label: "Elect Preferred Replica",
Value: "0",
},
action_kit_api.ExplicitParameterOption{
Label: "Elect Live Replica (needs Unclean leader enabled)",
Value: "1",
},
}),
Order: extutil.Ptr(2),
DefaultValue: extutil.Ptr("0"),
},
},
}
}

func (f kafkaBrokerElectNewLeaderAttack) Prepare(_ context.Context, state *KafkaBrokerAttackState, request action_kit_api.PrepareActionRequestBody) (*action_kit_api.PrepareResult, error) {
state.Topic = extutil.MustHaveValue(request.Target.Attributes, "kafka.topic.name")[0]
state.Partitions = extutil.ToStringArray(request.Config["partitions"])
state.ElectLeadersHow = extutil.ToString(request.Config["election_method"])
return nil, nil
}

Expand All @@ -91,18 +111,26 @@ func (f kafkaBrokerElectNewLeaderAttack) Start(ctx context.Context, state *Kafka

topicSet.Add(state.Topic, partitions...)

results, err := client.ElectLeaders(ctx, kadm.ElectPreferredReplica, topicSet)
var electionMethod int8
if state.ElectLeadersHow == fmt.Sprintf("%d", kadm.ElectPreferredReplica) {
electionMethod = int8(kadm.ElectPreferredReplica)
} else {
electionMethod = int8(kadm.ElectLiveReplica)
}

results, err := client.ElectLeaders(ctx, kadm.ElectLeadersHow(electionMethod), topicSet)
if err != nil {
return nil, fmt.Errorf("failed to elect new leader for topic %s and partitions %s: %s", state.Topic, state.Partitions, err)
}

var errorElectLeader action_kit_api.ActionKitError
for t, parts := range results {
for partition, result := range parts {
if result.Err != nil {
messages = append(messages, action_kit_api.Message{
Level: extutil.Ptr(action_kit_api.Error),
Level: extutil.Ptr(action_kit_api.Warn),
Message: fmt.Sprintf("Error while electing leader for topic '%s', partition %d, error is: %s", t, partition, result.Err.Error()),
})
errorElectLeader = action_kit_api.ActionKitError{Title: fmt.Sprintf("Election failed for partition %d", partition), Detail: extutil.Ptr(result.Err.Error())}
} else {
messages = append(messages, action_kit_api.Message{
Level: extutil.Ptr(action_kit_api.Info),
Expand All @@ -114,6 +142,7 @@ func (f kafkaBrokerElectNewLeaderAttack) Start(ctx context.Context, state *Kafka

return &action_kit_api.StartResult{
Messages: &messages,
Error: &errorElectLeader,
}, nil

}
Expand Down

0 comments on commit 4632685

Please sign in to comment.