Skip to content

Commit

Permalink
Change Result to an Interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ccremer committed Mar 21, 2022
1 parent 46bf9bf commit d6b1fc7
Show file tree
Hide file tree
Showing 16 changed files with 131 additions and 121 deletions.
19 changes: 9 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ Small Go utility that executes business actions in a pipeline.
import (
"context"
pipeline "github.com/ccremer/go-command-pipeline"
"github.com/ccremer/go-command-pipeline/predicate"
)

type Data struct {
Expand All @@ -24,7 +23,7 @@ func main() {
data := &Data // define arbitrary data to pass around in the steps.
p := pipeline.NewPipeline()
p.WithSteps(
pipeline.NewStep("define random number", defineNumber),
pipeline.NewStepFromFunc("define random number", defineNumber),
pipeline.NewStepFromFunc("print number", printNumber),
)
result := p.RunWithContext(context.WithValue(context.Background, "data", data))
Expand All @@ -33,9 +32,9 @@ func main() {
}
}

func defineNumber(ctx context.Context) pipeline.Result {
func defineNumber(ctx context.Context) error {
ctx.Value("data").(*Data).Number = 10
return pipeline.Result{}
return nil
}

// Let's assume this is a business function that can fail.
Expand Down Expand Up @@ -76,18 +75,18 @@ It could be simplified to something like this:
```go
func Persist(data *Data) error {
p := pipeline.NewPipeline().WithSteps(
pipeline.NewStep("prepareTransaction", prepareTransaction()),
pipeline.NewStep("executeQuery", executeQuery()),
pipeline.NewStep("commitTransaction", commit()),
pipeline.NewStepFromFunc("prepareTransaction", prepareTransaction()),
pipeline.NewStepFromFunc("executeQuery", executeQuery()),
pipeline.NewStepFromFunc("commitTransaction", commit()),
)
return p.RunWithContext(context.WithValue(context.Background(), myKey, data).Err
}

func executeQuery() pipeline.ActionFunc {
return func(ctx context.Context) pipeline.Result {
func executeQuery() error {
return func(ctx context.Context) error {
data := ctx.Value(myKey).(*Data)
err := database.executeQuery("SOME QUERY", data)
return pipeline.Result{Err: err}
return err
)
}
...
Expand Down
3 changes: 2 additions & 1 deletion examples/abort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ func TestExample_Abort(t *testing.T) {
pipeline.NewStepFromFunc("never executed", doNotExecute),
)
result := p.Run()
assert.True(t, result.IsSuccessful())
assert.True(t, result.IsCompleted())
assert.True(t, result.IsAborted())
assert.False(t, result.IsSuccessful())
}

func doNotExecute(_ context.Context) error {
Expand Down
6 changes: 3 additions & 3 deletions examples/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestExample_Context(t *testing.T) {
// Create pipeline with defaults
p := pipeline.NewPipeline()
p.WithSteps(
pipeline.NewStep("define random number", defineNumber),
pipeline.NewStepFromFunc("define random number", defineNumber),
pipeline.NewStepFromFunc("print number", printNumber),
)
result := p.RunWithContext(context.WithValue(context.Background(), key, &Data{}))
Expand All @@ -31,9 +31,9 @@ func TestExample_Context(t *testing.T) {
}
}

func defineNumber(ctx context.Context) pipeline.Result {
func defineNumber(ctx context.Context) error {
ctx.Value(key).(*Data).Number = rand.Int()
return pipeline.Result{}
return nil
}

func printNumber(ctx context.Context) error {
Expand Down
32 changes: 16 additions & 16 deletions examples/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
func TestExample_Git(t *testing.T) {
p := pipeline.NewPipeline()
p.WithSteps(
pipeline.ToStep("clone repository", CloneGitRepository(), pipeline.Not(DirExists("my-repo"))),
pipeline.NewStep("checkout branch", CheckoutBranch()),
pipeline.NewStep("pull", Pull()).WithResultHandler(logSuccess),
CloneGitRepository(),
CheckoutBranch(),
Pull().WithResultHandler(logSuccess),
)
result := p.Run()
if !result.IsSuccessful() {
Expand All @@ -27,29 +27,29 @@ func TestExample_Git(t *testing.T) {
}

func logSuccess(_ context.Context, result pipeline.Result) error {
log.Println("handler called")
log.Println("handler called", result.Name())
return result.Err()
}

func CloneGitRepository() pipeline.ActionFunc {
return func(_ context.Context) pipeline.Result {
func CloneGitRepository() pipeline.Step {
return pipeline.ToStep("clone repository", func(_ context.Context) error {
err := execGitCommand("clone", "[email protected]/ccremer/go-command-pipeline")
return pipeline.NewResultWithError("clone repository", err)
}
return err
}, pipeline.Not(DirExists("my-repo")))
}

func Pull() pipeline.ActionFunc {
return func(_ context.Context) pipeline.Result {
func Pull() pipeline.Step {
return pipeline.NewStepFromFunc("pull", func(_ context.Context) error {
err := execGitCommand("pull")
return pipeline.NewResultWithError("pull", err)
}
return err
})
}

func CheckoutBranch() pipeline.ActionFunc {
return func(_ context.Context) pipeline.Result {
func CheckoutBranch() pipeline.Step {
return pipeline.NewStepFromFunc("checkout branch", func(_ context.Context) error {
err := execGitCommand("checkout", "master")
return pipeline.NewResultWithError("checkout branch", err)
}
return err
})
}

func execGitCommand(args ...string) error {
Expand Down
2 changes: 1 addition & 1 deletion fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewFanOutStep(name string, pipelineSupplier Supplier, handler ParallelResul
}
wg.Wait()
res := collectResults(ctx, handler, &m)
return setResultErrorFromContext(ctx, res)
return setResultErrorFromContext(ctx, name, res)
}
return step
}
16 changes: 8 additions & 8 deletions fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ func TestNewFanOutStep(t *testing.T) {
"GivenPipelineWith_WhenRunningStep_ThenReturnSuccessButRunErrorHandler": {
jobs: 1,
returnErr: fmt.Errorf("should be called"),
givenResultHandler: func(ctx context.Context, _ map[uint64]Result) Result {
givenResultHandler: func(ctx context.Context, _ map[uint64]Result) error {
atomic.AddUint64(&counts, 1)
return Result{}
return nil
},
expectedCounts: 2,
},
Expand All @@ -43,9 +43,9 @@ func TestNewFanOutStep(t *testing.T) {
goleak.VerifyNone(t)
handler := tt.givenResultHandler
if handler == nil {
handler = func(ctx context.Context, results map[uint64]Result) Result {
handler = func(ctx context.Context, results map[uint64]Result) error {
assert.NoError(t, results[0].Err())
return Result{}
return nil
}
}
step := NewFanOutStep("fanout", func(_ context.Context, funcs chan *Pipeline) {
Expand Down Expand Up @@ -82,9 +82,9 @@ func TestNewFanOutStep_Cancel(t *testing.T) {
}
}
t.Fail() // should not reach this
}, func(ctx context.Context, results map[uint64]Result) Result {
}, func(ctx context.Context, results map[uint64]Result) error {
assert.Len(t, results, 3)
return NewResultWithError("fanout", fmt.Errorf("some error"))
return fmt.Errorf("some error")
})
ctx, cancel := context.WithTimeout(context.Background(), 25*time.Millisecond)
defer cancel()
Expand Down Expand Up @@ -112,13 +112,13 @@ func ExampleNewFanOutStep() {
}))
}
}
}, func(ctx context.Context, results map[uint64]Result) Result {
}, func(ctx context.Context, results map[uint64]Result) error {
for worker, result := range results {
if result.IsFailed() {
fmt.Println(fmt.Sprintf("Worker %d failed: %v", worker, result.Err()))
}
}
return Result{}
return nil
})
p.AddStep(fanout)
p.Run()
Expand Down
10 changes: 5 additions & 5 deletions pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func (p *Pipeline) Run() Result {
func (p *Pipeline) RunWithContext(ctx context.Context) Result {
result := p.doRun(ctx)
if p.finalizer != nil {
result.err = p.finalizer(ctx, result)
err := p.finalizer(ctx, result)
return newResult(result.Name(), err, result.IsAborted(), result.IsCanceled())
}
return result
}
Expand All @@ -131,21 +132,20 @@ func (p *Pipeline) doRun(ctx context.Context) Result {
result := step.F(ctx)
var err error
if step.H != nil {
result.name = step.Name
err = step.H(ctx, result)
} else {
err = result.Err()
}
if err != nil {
if errors.Is(err, ErrAbort) {
// Abort pipeline without error
return Result{aborted: true, name: step.Name}
return newResult(step.Name, nil, true, false)
}
return p.fail(err, step)
}
}
}
return Result{name: name}
return newEmptyResult(name)
}

func (p *Pipeline) fail(err error, step Step) Result {
Expand All @@ -156,5 +156,5 @@ func (p *Pipeline) fail(err error, step Step) Result {
resultErr = fmt.Errorf("step %q failed: %w", step.Name, err)
}
canceled := errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
return NewResult(step.Name, resultErr, false, canceled)
return newResult(step.Name, resultErr, false, canceled)
}
38 changes: 19 additions & 19 deletions pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ func TestPipeline_Run(t *testing.T) {
givenSteps: []Step{
NewStep("test-step", func(_ context.Context) Result {
callCount += 1
return Result{}
return newEmptyResult("test-step")
}),
},
expectedCalls: 1,
},
"GivenSingleStep_WhenBeforeHookGiven_ThenCallBeforeHook": {
givenSteps: []Step{
NewStep("test-step", func(_ context.Context) Result {
NewStepFromFunc("test-step", func(_ context.Context) error {
callCount += hook.calls + 1
return Result{}
return nil
}),
},
givenBeforeHook: hook.Accept,
Expand All @@ -58,9 +58,9 @@ func TestPipeline_Run(t *testing.T) {
},
"GivenSingleStepWithoutHandler_WhenRunningWithError_ThenReturnError": {
givenSteps: []Step{
NewStep("test-step", func(_ context.Context) Result {
NewStepFromFunc("test-step", func(_ context.Context) error {
callCount += 1
return Result{err: errors.New("step failed")}
return errors.New("step failed")
}),
},
expectedCalls: 1,
Expand All @@ -84,33 +84,33 @@ func TestPipeline_Run(t *testing.T) {
},
"GivenSingleStepWithHandler_WhenRunningWithError_ThenAbortWithError": {
givenSteps: []Step{
NewStep("test-step", func(_ context.Context) Result {
NewStepFromFunc("test-step", func(_ context.Context) error {
callCount += 1
return Result{}
return nil
}).WithResultHandler(func(_ context.Context, result Result) error {
callCount += 1
return errors.New("handler")
}),
NewStep("don't run this step", func(_ context.Context) Result {
NewStepFromFunc("don't run this step", func(_ context.Context) error {
callCount += 1
return Result{}
return nil
}),
},
expectedCalls: 2,
expectErrorString: "handler",
},
"GivenSingleStepWithHandler_WhenNullifyingError_ThenContinuePipeline": {
givenSteps: []Step{
NewStep("test-step", func(_ context.Context) Result {
NewStepFromFunc("test-step", func(_ context.Context) error {
callCount += 1
return Result{err: errors.New("failed step")}
return errors.New("failed step")
}).WithResultHandler(func(_ context.Context, result Result) error {
callCount += 1
return nil
}),
NewStep("continue", func(_ context.Context) Result {
NewStepFromFunc("continue", func(_ context.Context) error {
callCount += 1
return Result{}
return nil
}),
},
additionalAssertions: func(t *testing.T, result Result) {
Expand All @@ -120,14 +120,14 @@ func TestPipeline_Run(t *testing.T) {
},
"GivenNestedPipeline_WhenParentPipelineRuns_ThenRunNestedAsWell": {
givenSteps: []Step{
NewStep("test-step", func(_ context.Context) Result {
NewStepFromFunc("test-step", func(_ context.Context) error {
callCount += 1
return Result{}
return nil
}),
NewPipeline().
AddStep(NewStep("nested-step", func(_ context.Context) Result {
AddStep(NewStepFromFunc("nested-step", func(_ context.Context) error {
callCount += 1
return Result{}
return nil
})).AsNestedStep("nested-pipeline"),
},
expectedCalls: 2,
Expand All @@ -136,9 +136,9 @@ func TestPipeline_Run(t *testing.T) {
givenSteps: []Step{
NewPipeline().
WithNestedSteps("nested-pipeline",
NewStep("nested-step", func(_ context.Context) Result {
NewStepFromFunc("nested-step", func(_ context.Context) error {
callCount += 1
return Result{}
return nil
})),
},
expectedCalls: 1,
Expand Down
2 changes: 1 addition & 1 deletion pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewWorkerPoolStep(name string, size int, pipelineSupplier Supplier, handler

wg.Wait()
res := collectResults(ctx, handler, &m)
return setResultErrorFromContext(ctx, res)
return setResultErrorFromContext(ctx, name, res)
}
return step
}
Expand Down
Loading

0 comments on commit d6b1fc7

Please sign in to comment.