Skip to content

Commit

Permalink
elect new leader fix
Browse files Browse the repository at this point in the history
  • Loading branch information
achoimet committed Oct 3, 2024
1 parent 4065498 commit 106d603
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 24 deletions.
78 changes: 55 additions & 23 deletions extkafka/broker_attack_elect_new_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/steadybit/extension-kit/extutil"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"strconv"
"time"
)

Expand All @@ -31,18 +32,18 @@ func (f kafkaBrokerElectNewLeaderAttack) NewEmptyState() KafkaBrokerAttackState

func (f kafkaBrokerElectNewLeaderAttack) Describe() action_kit_api.ActionDescription {
return action_kit_api.ActionDescription{
Id: fmt.Sprintf("%s.failover", kafkaBrokerTargetId),
Label: "Trigger Failover",
Description: "Triggers nodegroup failover by promoting a replica node to primary",
Id: fmt.Sprintf("%s.elect-new-leader", kafkaTopicTargetId),
Label: "Trigger Election for new leader",
Description: "Triggers election for a new leader for a given topic and partition(s)",
Version: extbuild.GetSemverVersionStringOrUnknown(),
Icon: extutil.Ptr(kafkaIcon),
TargetSelection: extutil.Ptr(action_kit_api.TargetSelection{
TargetType: kafkaBrokerTargetId,
SelectionTemplates: extutil.Ptr([]action_kit_api.TargetSelectionTemplate{
{
Label: "by elasticache nodegroup id",
Description: extutil.Ptr("Find node groups by replication group id and node group id"),
Query: "aws.elasticache.replication-group.id=\"\" and aws.elasticache.replication-group.node-group.id=\"\"",
Label: "by topic id",
Description: extutil.Ptr("Find topic by id"),
Query: "kafka.topic.id=\"\"",
},
}),
}),
Expand All @@ -51,29 +52,28 @@ func (f kafkaBrokerElectNewLeaderAttack) Describe() action_kit_api.ActionDescrip
Kind: action_kit_api.Attack,
Parameters: []action_kit_api.ActionParameter{
{
Name: "Topic",
Label: "Topic to elect new leader",
Description: extutil.Ptr("List of topics to elect a new leader, if none, will impact all topics."),
Type: action_kit_api.String,
DefaultValue: extutil.Ptr("1"),
Required: extutil.Ptr(false),
Order: extutil.Ptr(1),
Name: "Topic",
Label: "Topic of the partition(s) to elect new leader",
Description: extutil.Ptr("Topic where the partition must have a new leader"),
Type: action_kit_api.String,
Required: extutil.Ptr(true),
Order: extutil.Ptr(1),
},
{
Name: "Partition",
Label: "Partition to elect new leader",
Description: extutil.Ptr("List of topics to elect a new leader, if none, will impact all topics."),
Type: action_kit_api.String,
DefaultValue: extutil.Ptr("1"),
Required: extutil.Ptr(false),
Order: extutil.Ptr(1),
Name: "Partition",
Label: "Partition to elect new leader",
Description: extutil.Ptr("One or more partitions to trigger a new leader election"),
Type: action_kit_api.StringArray,
Required: extutil.Ptr(false),
Order: extutil.Ptr(2),
},
},
}
}

func (f kafkaBrokerElectNewLeaderAttack) Prepare(_ context.Context, state *KafkaBrokerAttackState, request action_kit_api.PrepareActionRequestBody) (*action_kit_api.PrepareResult, error) {
state.Topic = extutil.MustHaveValue(request.Target.Attributes, "kafka.topic.id")[0]
state.Topic = extutil.MustHaveValue(request.Target.Attributes, "kafka.topic.name")[0]
state.Partitions = extutil.ToStringArray(request.Config["partitions"])
return nil, nil
}

Expand All @@ -94,14 +94,34 @@ func (f kafkaBrokerElectNewLeaderAttack) Start(ctx context.Context, state *Kafka
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

// Parse partitions
partitions, err := sliceAtoi(state.Partitions)
if err != nil {
return nil, fmt.Errorf("failed to parse partitions: %s", err.Error())
}

// Create a slice of TopicPartition
topicSet := make(kadm.TopicsSet)
topicSet.Add(state.Topic, state.Partitions...)

_, err = adminClient.ElectLeaders(ctx, kadm.ElectPreferredReplica, topicSet)
topicSet.Add(state.Topic, partitions...)

results, err := adminClient.ElectLeaders(ctx, kadm.ElectPreferredReplica, topicSet)
if err != nil {
return nil, fmt.Errorf("failed to elect new leader for topic %s and partitions %d: %s", state.Topic, state.Partitions, err)

Check failure on line 110 in extkafka/broker_attack_elect_new_leader.go

View workflow job for this annotation

GitHub Actions / extension-ci / Audit

fmt.Errorf format %d has arg state.Partitions of wrong type []string
}

for topic, partitions := range results {
for partition, result := range partitions {
if result.Err != nil {
fmt.Printf("Error electing leader for topic '%s', partition %d: %v\n",
topic, partition, result.Err)
} else {
fmt.Printf("Successfully elected leader for topic '%s', partition %d\n",
topic, partition)
}
}
}

return &action_kit_api.StartResult{
Messages: &[]action_kit_api.Message{{
Level: extutil.Ptr(action_kit_api.Info),
Expand All @@ -110,3 +130,15 @@ func (f kafkaBrokerElectNewLeaderAttack) Start(ctx context.Context, state *Kafka
}, nil

}

func sliceAtoi(sa []string) ([]int32, error) {
si := make([]int32, 0, len(sa))
for _, a := range sa {
i, err := strconv.Atoi(a)
if err != nil {
return si, err
}
si = append(si, int32(i))
}
return si, nil
}
2 changes: 1 addition & 1 deletion extkafka/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
type KafkaBrokerAttackState struct {
NodeID string
Topic string
Partitions []int32
Partitions []string
DelayBetweenRequestsInMS int64
SuccessRate int
ResponseTimeMode string
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func registerHandlers(ctx context.Context) {
discovery_kit_sdk.Register(extkafka.NewKafkaTopicDiscovery(ctx))
action_kit_sdk.RegisterAction(extkafka.NewProduceMessageActionPeriodically())
action_kit_sdk.RegisterAction(extkafka.NewProduceMessageActionFixedAmount())
action_kit_sdk.RegisterAction(extkafka.NewKafkaBrokerElectNewLeaderAttack())

exthttp.RegisterHttpHandler("/", exthttp.GetterAsHandler(getExtensionList))
}
Expand Down

0 comments on commit 106d603

Please sign in to comment.