Skip to content

Commit

Permalink
some fix
Browse files Browse the repository at this point in the history
  • Loading branch information
achoimet committed Oct 8, 2024
1 parent 4f019a0 commit a78bdb6
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 25 deletions.
17 changes: 10 additions & 7 deletions extkafka/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
)

const (
kafkaBrokerTargetId = "com.steadybit.extension_kafka.broker"
kafkaTopicTargetId = "com.steadybit.extension_kafka.topic"
TargetIDPeriodically = "com.steadybit.extension_kafka.produce.periodically"
TargetIDFixedAmount = "com.steadybit.extension_kafka.produce.fixed_amount"
kafkaBrokerTargetId = "com.steadybit.extension_kafka.broker"
kafkaTopicTargetId = "com.steadybit.extension_kafka.topic"
TargetIDProducePeriodically = "com.steadybit.extension_kafka.produce.periodically"
TargetIDConsumePeriodically = "com.steadybit.extension_kafka.consume.periodically"
TargetIDProduceFixedAmount = "com.steadybit.extension_kafka.produce.fixed_amount"
TargetIDConsumeFixedAmount = "com.steadybit.extension_kafka.consume.fixed_amount"
)

const (
Expand All @@ -24,6 +26,7 @@ const (
type KafkaBrokerAttackState struct {
Topic string
Partitions []string
Offset int64
DelayBetweenRequestsInMS int64
SuccessRate int
Timeout time.Time
Expand Down Expand Up @@ -79,7 +82,7 @@ var (
duration = action_kit_api.ActionParameter{
Name: "duration",
Label: "Duration",
Description: extutil.Ptr("In which timeframe should the specified requests be executed?"),
Description: extutil.Ptr("In which timeframe should the specified records be produced?"),
Type: action_kit_api.Duration,
DefaultValue: extutil.Ptr("10s"),
Required: extutil.Ptr(true),
Expand All @@ -94,7 +97,7 @@ var (
successRate = action_kit_api.ActionParameter{
Name: "successRate",
Label: "Required Success Rate",
Description: extutil.Ptr("How many percent of the Request must be at least successful (in terms of the following response verifications) to continue the experiment execution? The result will be evaluated and the end of the given duration."),
Description: extutil.Ptr("How many percent of the records must be at least successful (in terms of the following response verifications) to continue the experiment execution? The result will be evaluated and the end of the given duration."),
Type: action_kit_api.Percentage,
DefaultValue: extutil.Ptr("100"),
Required: extutil.Ptr(true),
Expand All @@ -105,7 +108,7 @@ var (
maxConcurrent = action_kit_api.ActionParameter{
Name: "maxConcurrent",
Label: "Max concurrent requests",
Description: extutil.Ptr("Maximum count on parallel running requests. (min 1, max 10)"),
Description: extutil.Ptr("Maximum count on parallel producing requests. (min 1, max 10)"),
Type: action_kit_api.Integer,
DefaultValue: extutil.Ptr("5"),
Required: extutil.Ptr(true),
Expand Down
169 changes: 169 additions & 0 deletions extkafka/consumeFixAmount.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright 2023 steadybit GmbH. All rights reserved.
*/

package extkafka

import (
"context"
"errors"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
"github.com/steadybit/action-kit/go/action_kit_api/v2"
"github.com/steadybit/action-kit/go/action_kit_sdk"
"github.com/steadybit/extension-kit/extbuild"
"github.com/steadybit/extension-kit/extutil"
)

type consumeMessageActionFixedAmount struct{}

// Make sure Action implements all required interfaces
var (
_ action_kit_sdk.Action[KafkaBrokerAttackState] = (*consumeMessageActionFixedAmount)(nil)
_ action_kit_sdk.ActionWithStatus[KafkaBrokerAttackState] = (*consumeMessageActionFixedAmount)(nil)

_ action_kit_sdk.ActionWithStop[KafkaBrokerAttackState] = (*consumeMessageActionFixedAmount)(nil)
)

func NewConsumeMessageActionFixedAmount() action_kit_sdk.Action[KafkaBrokerAttackState] {
return &consumeMessageActionFixedAmount{}
}

func (l *consumeMessageActionFixedAmount) NewEmptyState() KafkaBrokerAttackState {
return KafkaBrokerAttackState{}
}

// Describe returns the action description for the platform with all required information.
func (l *consumeMessageActionFixedAmount) Describe() action_kit_api.ActionDescription {
return action_kit_api.ActionDescription{
Id: TargetIDConsumeFixedAmount,
Label: "Consume X records",
Description: "Consume a certain amount of kafka messages for a given duration",
Version: extbuild.GetSemverVersionStringOrUnknown(),
Icon: extutil.Ptr(kafkaMessageFixedAmount),
TargetSelection: extutil.Ptr(action_kit_api.TargetSelection{
TargetType: kafkaTopicTargetId,
SelectionTemplates: extutil.Ptr([]action_kit_api.TargetSelectionTemplate{
{
Label: "by topic name",
Description: extutil.Ptr("Find topic by name"),
Query: "kafka.topic.name=\"\"",
},
}),
}),
//Widgets: extutil.Ptr([]action_kit_api.Widget{
// action_kit_api.PredefinedWidget{
// Type: action_kit_api.ComSteadybitWidgetPredefined,
// PredefinedWidgetId: "com.steadybit.widget.predefined.HttpCheck",
// },
//}),

// Technology for the targets to appear in
Technology: extutil.Ptr("Kafka"),

// To clarify the purpose of the action:
// Check: Will perform checks on the targets
Kind: action_kit_api.Check,

// How the action is controlled over time.
// External: The agent takes care and calls stop then the time has passed. Requires a duration parameter. Use this when the duration is known in advance.
// Internal: The action has to implement the status endpoint to signal when the action is done. Use this when the duration is not known in advance.
// Instantaneous: The action is done immediately. Use this for actions that happen immediately, e.g. a reboot.
TimeControl: action_kit_api.TimeControlInternal,

// The parameters for the action
Parameters: []action_kit_api.ActionParameter{
//------------------------
// Request Definition
//------------------------
{
Name: "-",
Label: "-",
Type: action_kit_api.Separator,
Order: extutil.Ptr(5),
},
//------------------------
// Repitions
//------------------------
repetitionControl,
{
Name: "numberOfRecords",
Label: "Number of Records.",
Description: extutil.Ptr("Fixed number of Records, distributed to given duration"),
Type: action_kit_api.Integer,
Required: extutil.Ptr(true),
DefaultValue: extutil.Ptr("1"),
Order: extutil.Ptr(7),
},
duration,
{
Name: "-",
Label: "-",
Type: action_kit_api.Separator,
Order: extutil.Ptr(9),
},
//------------------------
// Result Verification
//------------------------
resultVerification,
successRate,

//------------------------
// Additional Settings
//------------------------

maxConcurrent,
},
Status: extutil.Ptr(action_kit_api.MutatingEndpointReferenceWithCallInterval{
CallInterval: extutil.Ptr("1s"),
}),
Stop: extutil.Ptr(action_kit_api.MutatingEndpointReference{}),
}
}

func (l *consumeMessageActionFixedAmount) Prepare(_ context.Context, state *KafkaBrokerAttackState, request action_kit_api.PrepareActionRequestBody) (*action_kit_api.PrepareResult, error) {
if extutil.ToInt64(request.Config["duration"]) == 0 {
return nil, errors.New("duration must be greater than 0")
}
state.DelayBetweenRequestsInMS = getDelayBetweenRequestsInMsFixedAmount(extutil.ToInt64(request.Config["duration"]), extutil.ToInt64(request.Config["numberOfRecords"]))
state.Topic = extutil.MustHaveValue(request.Target.Attributes, "kafka.topic.name")[0]
return prepare(false, request, state, checkEndedFixedAmount)
}

// Start is called to start the action
// You can mutate the state here.
// You can use the result to return messages/errors/metrics or artifacts
func (l *consumeMessageActionFixedAmount) Start(_ context.Context, state *KafkaBrokerAttackState) (*action_kit_api.StartResult, error) {
start(state)
return nil, nil
}

// Status is called to get the current status of the action
func (l *consumeMessageActionFixedAmount) Status(_ context.Context, state *KafkaBrokerAttackState) (*action_kit_api.StatusResult, error) {
executionRunData, err := loadExecutionRunData(state.ExecutionID)
if err != nil {
log.Error().Err(err).Msg("Failed to load execution run data")
return nil, err
}

completed := checkEndedFixedAmount(executionRunData, state)
if completed {
stopTickers(executionRunData)
log.Info().Msg("Action completed")
}

latestMetrics := retrieveLatestMetrics(executionRunData.metrics)

return &action_kit_api.StatusResult{
Completed: completed,
Metrics: extutil.Ptr(latestMetrics),
}, nil
}

func (l *consumeMessageActionFixedAmount) Stop(_ context.Context, state *KafkaBrokerAttackState) (*action_kit_api.StopResult, error) {
return stop(state)
}

func (l *consumeMessageActionFixedAmount) getExecutionRunData(executionID uuid.UUID) (*ExecutionRunData, error) {
return loadExecutionRunData(executionID)
}
17 changes: 13 additions & 4 deletions extkafka/produceFixAmount.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,21 @@ func (l *produceMessageActionFixedAmount) NewEmptyState() KafkaBrokerAttackState
// Describe returns the action description for the platform with all required information.
func (l *produceMessageActionFixedAmount) Describe() action_kit_api.ActionDescription {
return action_kit_api.ActionDescription{
Id: TargetIDFixedAmount,
Id: TargetIDProduceFixedAmount,
Label: "Produce X records",
Description: "Produce a certain amount of kafka messages for a given duration",
Version: extbuild.GetSemverVersionStringOrUnknown(),
Icon: extutil.Ptr(kafkaMessageFixedAmount),
TargetSelection: extutil.Ptr(action_kit_api.TargetSelection{
TargetType: kafkaTopicTargetId,
SelectionTemplates: extutil.Ptr([]action_kit_api.TargetSelectionTemplate{
{
Label: "by topic name",
Description: extutil.Ptr("Find topic by name"),
Query: "kafka.topic.name=\"\"",
},
}),
}),
//Widgets: extutil.Ptr([]action_kit_api.Widget{
// action_kit_api.PredefinedWidget{
// Type: action_kit_api.ComSteadybitWidgetPredefined,
Expand All @@ -66,7 +76,6 @@ func (l *produceMessageActionFixedAmount) Describe() action_kit_api.ActionDescri
//------------------------
// Request Definition
//------------------------
topic,
recordKey,
recordValue,
recordHeaders,
Expand Down Expand Up @@ -128,8 +137,8 @@ func (l *produceMessageActionFixedAmount) Prepare(_ context.Context, state *Kafk
return nil, errors.New("duration must be greater than 0")
}
state.DelayBetweenRequestsInMS = getDelayBetweenRequestsInMsFixedAmount(extutil.ToInt64(request.Config["duration"]), extutil.ToInt64(request.Config["numberOfRecords"]))

return prepare(request, state, checkEndedFixedAmount)
state.Topic = extutil.MustHaveValue(request.Target.Attributes, "kafka.topic.name")[0]
return prepare(true, request, state, checkEndedFixedAmount)
}

func checkEndedFixedAmount(executionRunData *ExecutionRunData, state *KafkaBrokerAttackState) bool {
Expand Down
17 changes: 13 additions & 4 deletions extkafka/producePeriodically.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,21 @@ func (l *produceMessageActionPeriodically) NewEmptyState() KafkaBrokerAttackStat
// Describe returns the action description for the platform with all required information.
func (l *produceMessageActionPeriodically) Describe() action_kit_api.ActionDescription {
return action_kit_api.ActionDescription{
Id: TargetIDPeriodically,
Id: TargetIDProducePeriodically,
Label: "Produce X records per second",
Description: "Produce kafka messages periodically (messages / s)",
Version: extbuild.GetSemverVersionStringOrUnknown(),
Icon: extutil.Ptr(kafkaMessagePeriodically),
TargetSelection: extutil.Ptr(action_kit_api.TargetSelection{
TargetType: kafkaTopicTargetId,
SelectionTemplates: extutil.Ptr([]action_kit_api.TargetSelectionTemplate{
{
Label: "by topic name",
Description: extutil.Ptr("Find topic by name"),
Query: "kafka.topic.name=\"\"",
},
}),
}),
//Widgets: extutil.Ptr([]action_kit_api.Widget{
// action_kit_api.PredefinedWidget{
// Type: action_kit_api.ComSteadybitWidgetPredefined,
Expand All @@ -64,7 +74,6 @@ func (l *produceMessageActionPeriodically) Describe() action_kit_api.ActionDescr
//------------------------
// Request Definition
//------------------------
topic,
recordKey,
recordValue,
recordHeaders,
Expand Down Expand Up @@ -123,8 +132,8 @@ func getDelayBetweenRequestsInMsPeriodically(recordsPerSecond int64) int64 {

func (l *produceMessageActionPeriodically) Prepare(_ context.Context, state *KafkaBrokerAttackState, request action_kit_api.PrepareActionRequestBody) (*action_kit_api.PrepareResult, error) {
state.DelayBetweenRequestsInMS = getDelayBetweenRequestsInMsPeriodically(extutil.ToInt64(request.Config["recordsPerSecond"]))

return prepare(request, state, func(executionRunData *ExecutionRunData, state *KafkaBrokerAttackState) bool { return false })
state.Topic = extutil.MustHaveValue(request.Target.Attributes, "kafka.topic.name")[0]
return prepare(true, request, state, func(executionRunData *ExecutionRunData, state *KafkaBrokerAttackState) bool { return false })
}

// Start is called to start the action
Expand Down
Loading

0 comments on commit a78bdb6

Please sign in to comment.