Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding activity failures test #74

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ PROGS = helloworld \
splitmerge \
timer \
localactivity \
localactivityfailure \
query \
consistentquery \
cron \
Expand Down Expand Up @@ -111,6 +112,9 @@ timer:
localactivity:
go build -o bin/localactivity cmd/samples/recipes/localactivity/*.go

localactivityfailure:
go build -o bin/localactivityfailure cmd/samples/recipes/localactivityfailure/*.go

query:
go build -o bin/query cmd/samples/recipes/query/*.go

Expand Down Expand Up @@ -153,6 +157,11 @@ signalcounter:
crossdomain:
go build -o bin/crossdomain cmd/samples/recipes/crossdomain/*.go


setup:
# use the ..cadence-server --env development_xdc_cluster0 ... to set up three
cadence --ad 127.0.0.1:7933 --env development --do samples-domain domain register --ac cluster0 --gd true

crossdomain-setup:
# use the ..cadence-server --env development_xdc_cluster0 ... to set up three
cadence --ad 127.0.0.1:7933 --env development --do domain0 domain register --ac cluster0 --gd true --clusters cluster0 cluster1 # global domain required
Expand Down Expand Up @@ -188,6 +197,7 @@ bins: helloworld \
expense_dummy \
expense \
localactivity \
localactivityfailure \
query \
consistentquery \
recovery \
Expand Down
27 changes: 27 additions & 0 deletions cmd/samples/recipes/localactivityfailure/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
This sample workflow demos how to use local activity to execute short/quick operations efficiently.

local_activity_workflow.go shows how to use local activity
local_activity_workflow_test.go shows how to unit-test workflow with local activity

Steps to run this sample:
1) You need a cadence service running. See details in cmd/samples/README.md
2) Run the following command to start worker
```
./bin/localactivityfailure -m worker
```
3) Run the following command to trigger a workflow execution. You should see workflowID and runID print out on screen.
```
./bin/localactivityfailure -m trigger
```
4) Run the following command to send signal "_1_" to the running workflow. You should see output that indicate 5 local activity has been run to check the conditions and one condition will be true which result in one activity to be scheduled.
```
./bin/localactivityfailure -m signal -s _1_ -w <workflow ID from step 3>
```
5) Repeat step 4, but with different signal data, for example, send signal like _2_4_ to make 2 conditions true.
```
./bin/localactivityfailure -m signal -s _2_4_ -w <workflow ID from step 3>
```
6) Run the following command this will exit the workflow.
```
./bin/localactivityfailure -m signal -s exit
```
138 changes: 138 additions & 0 deletions cmd/samples/recipes/localactivityfailure/failure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package main

import (
"context"
"fmt"
"go.uber.org/cadence"
"go.uber.org/zap"
"math/rand"
"sync"
"time"

"go.uber.org/cadence/workflow"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

var globalMu sync.Mutex

// a data-racy, completely problematic
var sideEffectsCapture int

type Result struct {
Result *int
}

func testLocalActivity(ctx workflow.Context) error {
log := workflow.GetLogger(ctx)
// zero out the global variable to start with to ensure test is sane
// this workflow is only a demonstration and global variables like this are *A BAD IDEA*
// it's being expicitly used to demonstrate how the local activities can cause side-effects to be replaced
sideEffectsCapture = 0
ao := workflow.LocalActivityOptions{
ScheduleToCloseTimeout: time.Second,
RetryPolicy: &cadence.RetryPolicy{
InitialInterval: time.Nanosecond,
BackoffCoefficient: 1,
MaximumAttempts: 100,
},
}
ctx = workflow.WithLocalActivityOptions(ctx, ao)

mu := sync.Mutex{}
sum := 0
wg := workflow.NewWaitGroup(ctx)

for i := 0; i < 20; i++ {
wg.Add(1)
workflow.Go(ctx, func(ctx workflow.Context) {
res := Result{}
// try doing an activity which crashes
err := workflow.ExecuteLocalActivity(ctx, SumActivity).Get(ctx, &res)
if err == nil {
if res.Result != nil {
mu.Lock()
sum += *res.Result
mu.Unlock()
}
} else {
log.Error("err - something went wrong ", zap.Error(err))
}
wg.Done()
})
}
wg.Wait(ctx)
log.Info(">>> Result - should be 100: ", zap.Int("sum", sum), zap.Int("side-effects", sideEffectsCapture))
return nil
}

func testNormalActivity(ctx workflow.Context) error {
log := workflow.GetLogger(ctx)
sideEffectsCapture = 0
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
ScheduleToCloseTimeout: time.Second,
ScheduleToStartTimeout: time.Second,
RetryPolicy: &cadence.RetryPolicy{
InitialInterval: time.Nanosecond,
BackoffCoefficient: 1,
MaximumAttempts: 100,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)

mu := sync.Mutex{}
sum := 0
wg := workflow.NewWaitGroup(ctx)

for i := 0; i < 20; i++ {
wg.Add(1)
workflow.Go(ctx, func(ctx workflow.Context) {
res := Result{}
// try doing an activity which crashes
err := workflow.ExecuteActivity(ctx, SumActivity).Get(ctx, &res)
if err == nil {
if res.Result != nil {
mu.Lock()
sum += *res.Result
mu.Unlock()
}
} else {
log.Error("err - something went wrong ", zap.Error(err))
}
wg.Done()
})
}
wg.Wait(ctx)
log.Info(">>> Result - should be 100: ", zap.Int("sum", sum), zap.Int("side-effects", sideEffectsCapture))
return nil
}

// an activity which crashes a lot and times out a lot
// and is expected to give the right result ultimately through
// the use of an aggressive retry policy
func SumActivity(ctx context.Context) (*Result, error) {

if rand.Intn(10) > 5 {
fmt.Println(">> returning an error")
return nil, fmt.Errorf("an error")
}

if rand.Intn(10) > 5 {
// timeout
time.Sleep(time.Second)
}

//if rand.Intn(10) > 5 {
// // at the time of writing panics aren't handled by localactivities
// panic("an error")
//}

output := 5
globalMu.Lock()
sideEffectsCapture += 5
globalMu.Unlock()
return &Result{Result: &output}, nil
}
76 changes: 76 additions & 0 deletions cmd/samples/recipes/localactivityfailure/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"flag"
"time"

"github.com/pborman/uuid"
"go.uber.org/cadence/client"
"go.uber.org/cadence/worker"

"github.com/uber-common/cadence-samples/cmd/samples/common"
)

const (
tl = "localActivityFailure"
)

// This needs to be done as part of a bootstrap step when the process starts.
// The workers are supposed to be long running.
func startWorkers(h *common.SampleHelper) {
// Configure worker options.
workerOptions := worker.Options{
MetricsScope: h.WorkerMetricScope,
Logger: h.Logger,
}
h.StartWorkers(h.Config.DomainName, "localActivityFailure", workerOptions)
}

func startNormalActivityTest(h *common.SampleHelper) {
workflowOptions := client.StartWorkflowOptions{
ID: "localactivityfailure_" + uuid.New(),
TaskList: tl,
ExecutionStartToCloseTimeout: time.Minute * 3,
DecisionTaskStartToCloseTimeout: time.Minute,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate,
}
h.StartWorkflow(workflowOptions, testNormalActivity)
}

func startLocalActivityTest(h *common.SampleHelper) {
workflowOptions := client.StartWorkflowOptions{
ID: "localactivityfailure_" + uuid.New(),
TaskList: tl,
ExecutionStartToCloseTimeout: time.Minute * 3,
DecisionTaskStartToCloseTimeout: time.Minute,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate,
}
h.StartWorkflow(workflowOptions, testLocalActivity)
}

func main() {
var mode, workflowID, signal string
flag.StringVar(&mode, "m", "trigger", "Mode is worker, trigger or query.")
flag.StringVar(&workflowID, "w", "", "WorkflowID")
flag.StringVar(&signal, "s", "signal_data", "SignalData")
flag.Parse()

var h common.SampleHelper
h.SetupServiceConfig()

switch mode {
case "worker":
h.RegisterWorkflow(testLocalActivity)
h.RegisterWorkflow(testNormalActivity)
h.RegisterActivity(SumActivity)
startWorkers(&h)

// The workers are supposed to be long running process that should not exit.
// Use select{} to block indefinitely for samples, you can quit by CMD+C.
select {}
case "trigger-local":
startNormalActivityTest(&h)
case "trigger-normal":
startLocalActivityTest(&h)
}
}