From 6c7dd009df5c3a9d3168a7d143f422bfe31ab511 Mon Sep 17 00:00:00 2001 From: David Porter Date: Mon, 29 May 2023 23:47:40 -0700 Subject: [PATCH 1/3] Adding activity failures test --- Makefile | 10 ++ .../recipes/localactivityfailure/README.md | 27 ++++ .../recipes/localactivityfailure/failure.go | 138 ++++++++++++++++++ .../recipes/localactivityfailure/main.go | 76 ++++++++++ 4 files changed, 251 insertions(+) create mode 100644 cmd/samples/recipes/localactivityfailure/README.md create mode 100644 cmd/samples/recipes/localactivityfailure/failure.go create mode 100644 cmd/samples/recipes/localactivityfailure/main.go diff --git a/Makefile b/Makefile index b95eaed..508e846 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,7 @@ PROGS = helloworld \ splitmerge \ timer \ localactivity \ + localactivityfailure \ query \ consistentquery \ cron \ @@ -111,6 +112,9 @@ timer: localactivity: go build -o bin/localactivity cmd/samples/recipes/localactivity/*.go +localactivityfailure: + go build -o bin/localactivityfailure cmd/samples/recipes/localactivityfailure/*.go + query: go build -o bin/query cmd/samples/recipes/query/*.go @@ -153,6 +157,11 @@ signalcounter: crossdomain: go build -o bin/crossdomain cmd/samples/recipes/crossdomain/*.go + +setup: + # use the ..cadence-server --env development_xdc_cluster0 ... to set up three + cadence --ad 127.0.0.1:7933 --env development --do samples-domain domain register --ac cluster0 --gd true + crossdomain-setup: # use the ..cadence-server --env development_xdc_cluster0 ... to set up three cadence --ad 127.0.0.1:7933 --env development --do domain0 domain register --ac cluster0 --gd true --clusters cluster0 cluster1 # global domain required @@ -188,6 +197,7 @@ bins: helloworld \ expense_dummy \ expense \ localactivity \ + localactivityfailure \ query \ consistentquery \ recovery \ diff --git a/cmd/samples/recipes/localactivityfailure/README.md b/cmd/samples/recipes/localactivityfailure/README.md new file mode 100644 index 0000000..2ca491d --- /dev/null +++ b/cmd/samples/recipes/localactivityfailure/README.md @@ -0,0 +1,27 @@ +This sample workflow demos how to use local activity to execute short/quick operations efficiently. + +local_activity_workflow.go shows how to use local activity +local_activity_workflow_test.go shows how to unit-test workflow with local activity + +Steps to run this sample: +1) You need a cadence service running. See details in cmd/samples/README.md +2) Run the following command to start worker +``` +./bin/localactivityfailure -m worker +``` +3) Run the following command to trigger a workflow execution. You should see workflowID and runID print out on screen. +``` +./bin/localactivityfailure -m trigger +``` +4) Run the following command to send signal "_1_" to the running workflow. You should see output that indicate 5 local activity has been run to check the conditions and one condition will be true which result in one activity to be scheduled. +``` +./bin/localactivityfailure -m signal -s _1_ -w +``` +5) Repeat step 4, but with different signal data, for example, send signal like _2_4_ to make 2 conditions true. +``` +./bin/localactivityfailure -m signal -s _2_4_ -w +``` +6) Run the following command this will exit the workflow. +``` +./bin/localactivityfailure -m signal -s exit +``` \ No newline at end of file diff --git a/cmd/samples/recipes/localactivityfailure/failure.go b/cmd/samples/recipes/localactivityfailure/failure.go new file mode 100644 index 0000000..ff2ebcb --- /dev/null +++ b/cmd/samples/recipes/localactivityfailure/failure.go @@ -0,0 +1,138 @@ +package main + +import ( + "context" + "fmt" + "go.uber.org/cadence" + "go.uber.org/zap" + "math/rand" + "sync" + "time" + + "go.uber.org/cadence/workflow" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +var globalMu sync.Mutex + +// a data-racy, completely problematic +var sideEffectsCapture int + +type Result struct { + Result *int +} + +func testLocalActivity(ctx workflow.Context) error { + log := workflow.GetLogger(ctx) + // zero out the global variable to start with to ensure test is sane + // this workflow is only a demonstration and global variables like this are *A BAD IDEA* + // it's being expicitly used to demonstrate how the local activities can cause side-effects to be replaced + sideEffectsCapture = 0 + ao := workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: time.Second, + RetryPolicy: &cadence.RetryPolicy{ + InitialInterval: time.Nanosecond, + BackoffCoefficient: 1, + MaximumAttempts: 100, + }, + } + ctx = workflow.WithLocalActivityOptions(ctx, ao) + + mu := sync.Mutex{} + sum := 0 + wg := workflow.NewWaitGroup(ctx) + + for i := 0; i < 20; i++ { + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + res := Result{} + // try doing an activity which crashes + err := workflow.ExecuteLocalActivity(ctx, SumActivity).Get(ctx, &res) + if err == nil { + if res.Result != nil { + mu.Lock() + sum += *res.Result + mu.Unlock() + } + } else { + log.Error("err - something went wrong ", zap.Error(err)) + } + wg.Done() + }) + } + wg.Wait(ctx) + log.Info(">>> Result - should be 100: ", zap.Int("sum", sum), zap.Int("side-effects", sideEffectsCapture)) + return nil +} + +func testNormalActivity(ctx workflow.Context) error { + log := workflow.GetLogger(ctx) + sideEffectsCapture = 0 + ao := workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, + ScheduleToCloseTimeout: time.Second, + ScheduleToStartTimeout: time.Second, + RetryPolicy: &cadence.RetryPolicy{ + InitialInterval: time.Nanosecond, + BackoffCoefficient: 1, + MaximumAttempts: 100, + }, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + mu := sync.Mutex{} + sum := 0 + wg := workflow.NewWaitGroup(ctx) + + for i := 0; i < 20; i++ { + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + res := Result{} + // try doing an activity which crashes + err := workflow.ExecuteActivity(ctx, SumActivity).Get(ctx, &res) + if err == nil { + if res.Result != nil { + mu.Lock() + sum += *res.Result + mu.Unlock() + } + } else { + log.Error("err - something went wrong ", zap.Error(err)) + } + wg.Done() + }) + } + wg.Wait(ctx) + log.Info(">>> Result - should be 100: ", zap.Int("sum", sum), zap.Int("side-effects", sideEffectsCapture)) + return nil +} + +// an activity which crashes a lot and times out a lot +// and is expected to give the right result ultimately through +// the use of an aggressive retry policy +func SumActivity(ctx context.Context) (*Result, error) { + + if rand.Intn(10) > 5 { + fmt.Println(">> returning an error") + return nil, fmt.Errorf("an error") + } + + if rand.Intn(10) > 5 { + // timeout + time.Sleep(time.Second) + } + + //if rand.Intn(10) > 5 { + // // at the time of writing panics aren't handled by localactivities + // panic("an error") + //} + + output := 5 + globalMu.Lock() + sideEffectsCapture += 5 + globalMu.Unlock() + return &Result{Result: &output}, nil +} diff --git a/cmd/samples/recipes/localactivityfailure/main.go b/cmd/samples/recipes/localactivityfailure/main.go new file mode 100644 index 0000000..036f8b7 --- /dev/null +++ b/cmd/samples/recipes/localactivityfailure/main.go @@ -0,0 +1,76 @@ +package main + +import ( + "flag" + "time" + + "github.com/pborman/uuid" + "go.uber.org/cadence/client" + "go.uber.org/cadence/worker" + + "github.com/uber-common/cadence-samples/cmd/samples/common" +) + +const ( + tl = "localActivityFailure" +) + +// This needs to be done as part of a bootstrap step when the process starts. +// The workers are supposed to be long running. +func startWorkers(h *common.SampleHelper) { + // Configure worker options. + workerOptions := worker.Options{ + MetricsScope: h.WorkerMetricScope, + Logger: h.Logger, + } + h.StartWorkers(h.Config.DomainName, "localActivityFailure", workerOptions) +} + +func startNormalActivityTest(h *common.SampleHelper) { + workflowOptions := client.StartWorkflowOptions{ + ID: "localactivityfailure_" + uuid.New(), + TaskList: tl, + ExecutionStartToCloseTimeout: time.Minute * 3, + DecisionTaskStartToCloseTimeout: time.Minute, + WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate, + } + h.StartWorkflow(workflowOptions, testNormalActivity) +} + +func startLocalActivityTest(h *common.SampleHelper) { + workflowOptions := client.StartWorkflowOptions{ + ID: "localactivityfailure_" + uuid.New(), + TaskList: tl, + ExecutionStartToCloseTimeout: time.Minute * 3, + DecisionTaskStartToCloseTimeout: time.Minute, + WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate, + } + h.StartWorkflow(workflowOptions, testLocalActivity) +} + +func main() { + var mode, workflowID, signal string + flag.StringVar(&mode, "m", "trigger", "Mode is worker, trigger or query.") + flag.StringVar(&workflowID, "w", "", "WorkflowID") + flag.StringVar(&signal, "s", "signal_data", "SignalData") + flag.Parse() + + var h common.SampleHelper + h.SetupServiceConfig() + + switch mode { + case "worker": + h.RegisterWorkflow(testLocalActivity) + h.RegisterWorkflow(testNormalActivity) + h.RegisterActivity(SumActivity) + startWorkers(&h) + + // The workers are supposed to be long running process that should not exit. + // Use select{} to block indefinitely for samples, you can quit by CMD+C. + select {} + case "trigger-local": + startNormalActivityTest(&h) + case "trigger-normal": + startLocalActivityTest(&h) + } +} From b1f325c9b3fa051c6c44171cc47c4c74507faaf5 Mon Sep 17 00:00:00 2001 From: David Porter Date: Mon, 29 May 2023 23:50:30 -0700 Subject: [PATCH 2/3] fix comment --- cmd/samples/recipes/localactivityfailure/failure.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/samples/recipes/localactivityfailure/failure.go b/cmd/samples/recipes/localactivityfailure/failure.go index ff2ebcb..05e6299 100644 --- a/cmd/samples/recipes/localactivityfailure/failure.go +++ b/cmd/samples/recipes/localactivityfailure/failure.go @@ -18,7 +18,8 @@ func init() { var globalMu sync.Mutex -// a data-racy, completely problematic +// a data-racy, completely problematic means to capture the completion of an activity +// locally and show where sometimes it's being retried var sideEffectsCapture int type Result struct { @@ -29,7 +30,7 @@ func testLocalActivity(ctx workflow.Context) error { log := workflow.GetLogger(ctx) // zero out the global variable to start with to ensure test is sane // this workflow is only a demonstration and global variables like this are *A BAD IDEA* - // it's being expicitly used to demonstrate how the local activities can cause side-effects to be replaced + // it's being expicitly used to demonstrate how side-effects are occurring sideEffectsCapture = 0 ao := workflow.LocalActivityOptions{ ScheduleToCloseTimeout: time.Second, From 21fefa63aade21e9c08c8af73766f5895f3caa40 Mon Sep 17 00:00:00 2001 From: David Porter Date: Mon, 29 May 2023 23:56:20 -0700 Subject: [PATCH 3/3] readme --- .../recipes/localactivityfailure/README.md | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/cmd/samples/recipes/localactivityfailure/README.md b/cmd/samples/recipes/localactivityfailure/README.md index 2ca491d..cebb38b 100644 --- a/cmd/samples/recipes/localactivityfailure/README.md +++ b/cmd/samples/recipes/localactivityfailure/README.md @@ -1,7 +1,6 @@ -This sample workflow demos how to use local activity to execute short/quick operations efficiently. +### Background -local_activity_workflow.go shows how to use local activity -local_activity_workflow_test.go shows how to unit-test workflow with local activity +This sample workflow demos a normal and local activity as they deal with a crashing, timing out, misbehaving activity through an aggressive retry policy. Steps to run this sample: 1) You need a cadence service running. See details in cmd/samples/README.md @@ -9,19 +8,13 @@ Steps to run this sample: ``` ./bin/localactivityfailure -m worker ``` -3) Run the following command to trigger a workflow execution. You should see workflowID and runID print out on screen. +3) Run the following command to trigger a local workflow execution. You should see workflowID and runID print out on screen and a final log entry showing the final result. ``` -./bin/localactivityfailure -m trigger +./bin/localactivityfailure -m trigger-local ``` -4) Run the following command to send signal "_1_" to the running workflow. You should see output that indicate 5 local activity has been run to check the conditions and one condition will be true which result in one activity to be scheduled. -``` -./bin/localactivityfailure -m signal -s _1_ -w -``` -5) Repeat step 4, but with different signal data, for example, send signal like _2_4_ to make 2 conditions true. -``` -./bin/localactivityfailure -m signal -s _2_4_ -w -``` -6) Run the following command this will exit the workflow. -``` -./bin/localactivityfailure -m signal -s exit -``` \ No newline at end of file + +## Test + +The test is relatively simple: Run an activity which will fail quite a lot, or timeout, and retry it sufficiently that it completes, eventually. Do this 20 times. + +As a simple verification, the integer returned (5 in this instance) run 20 times ought to equal 100 if all the activities eventually pass. More than that would be a failure of correctless and less would indicate the failures caused the activity to not complete sufficiently. \ No newline at end of file