Skip to content

Commit

Permalink
Merge pull request #5 from worldline-go/fix/context-cancel-after-fail…
Browse files Browse the repository at this point in the history
…ed-input

Failed input cancels worker context
  • Loading branch information
marekfilip authored Aug 23, 2023
2 parents 39ef04f + 508082f commit 4869c63
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 12 deletions.
75 changes: 72 additions & 3 deletions coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (e *executionTracker) assertExpectation(t *testing.T) {
return e.gotTrack[i].timestamp < e.gotTrack[j].timestamp
})

require.Equalf(t, len(e.expectedTrack), len(e.gotTrack), "run count should be equal to expectedOrder")
require.Equalf(t, len(e.expectedTrack), len(e.gotTrack), "incorrect run count in execution tracker")

// check the order
for i := range e.expectedTrack {
Expand Down Expand Up @@ -86,13 +86,83 @@ func TestCoordinator_RunConcurrent(t *testing.T) {
name: "test acceptable input data",
testFunc: testAcceptableInputData,
},
{
name: "test if failing on first input block second input",
testFunc: testReusingContextForMultiInputs,
},
}

for _, tt := range tests {
t.Run(tt.name, tt.testFunc)
}
}

// job for first input fail which leads to context cancellation,
// this should not lead to block processing of another inputs.
func testReusingContextForMultiInputs(t *testing.T) {
dummyErr := errors.New("dummy error")

// setting single worker
ctx := context.Background()

et := &executionTracker{
expectedTrack: []track{
{name: "1", callbackName: "preCheck"},
{name: "2", callbackName: "preCheck"},
{name: "2", callbackName: "job"},
{name: "shouldRunOnlyFor2", callbackName: "preCheck"},
{name: "shouldRunOnlyFor2", callbackName: "job"},
},
}

c, err := prepareCoordinatorWithSteps(
[]*choreograph.Step{
{
Name: "shouldRunForBoth",
Job: func(_ context.Context, input interface{}) error {
inputToStr := input.(string)

if inputToStr == "1" {
return dummyErr
}

et.registerExecution(inputToStr, "job")

return nil
},
PreCheck: func(_ context.Context, input interface{}) error {
inputToStr := input.(string)

et.registerExecution(inputToStr, "preCheck")

return nil
},
},
{
Name: "shouldRunOnlyFor2",
Job: et.getExecutionFn("shouldRunOnlyFor2", "job"),
PreCheck: et.getExecutionFn("shouldRunOnlyFor2", "preCheck"),
},
},
choreograph.WithWorkerCount(1),
)

require.NoError(t, err)

results, err := c.RunConcurrent(ctx, []interface{}{"1", "2"})
require.NoError(t, err)

out := <-results
require.ErrorIs(t, out.ExecutionErrors[0], dummyErr)
require.Error(t, out.RuntimeError)

out = <-results
require.Lenf(t, out.ExecutionErrors, 0, "no execution errors expected")
require.NoError(t, out.RuntimeError)

et.assertExpectation(t)
}

func testAcceptableInputData(t *testing.T) {
coordinator := choreograph.NewCoordinator()

Expand Down Expand Up @@ -553,13 +623,12 @@ func testExecutionCorrectness(t *testing.T) {
}

func testExecutionContinueCancel(t *testing.T) {
dummyErr := errors.New("dummy error")
et := &executionTracker{}

errReturningFn := func(name, callbackName string) func(_ context.Context) error {
return func(_ context.Context) error {
et.registerExecution(name, callbackName)
return dummyErr
return errors.New("dummy error")
}
}

Expand Down
12 changes: 3 additions & 9 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ type Result struct {
}

type worker struct {
ctxCancel context.CancelFunc
steps Steps
err []error
steps Steps
err []error
}

// StartWorker starts listening process to handle new inputs.
func (w *worker) StartWorker(ctx context.Context, inputs <-chan interface{}, results chan<- Result, group *sync.WaitGroup) {
workerCtx := context.WithValue(ctx, DataBagContextKey, new(DataBag))
workerCtx, w.ctxCancel = context.WithCancel(workerCtx)

for i := range inputs {
results <- Result{
Expand Down Expand Up @@ -87,9 +85,7 @@ func (w *worker) executeStep(ctx context.Context, stepIdx int, input interface{}
w.err = append(w.err, errors.Wrapf(err, "preCheck '%s'", w.steps.StepName(stepIdx)))

if errors.Is(err, ErrExecutionCanceled) || errors.Is(err, ErrUnexpectedType) {
w.ctxCancel()

return ErrExecutionCanceled
return errors.Wrap(err, "preCheck cancelled")
}

return nil
Expand All @@ -109,8 +105,6 @@ func (w *worker) executeStep(ctx context.Context, stepIdx int, input interface{}
if err != nil {
w.err = append(w.err, errors.Wrapf(err, "job '%s'", w.steps.StepName(stepIdx)))

w.ctxCancel()

return ErrJobFailed
}

Expand Down

0 comments on commit 4869c63

Please sign in to comment.