From 2c05cbfba2b25a346aceb3feb679b213cbf79366 Mon Sep 17 00:00:00 2001 From: Chris Date: Tue, 12 Apr 2022 14:39:59 +0200 Subject: [PATCH] Add AddStepFromFunc convenience method --- pipeline.go | 5 +++++ pipeline_test.go | 21 ++++++++++----------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pipeline.go b/pipeline.go index 31eb344..d1b3060 100644 --- a/pipeline.go +++ b/pipeline.go @@ -64,6 +64,11 @@ func (p *Pipeline) AddStep(step Step) *Pipeline { return p } +// AddStepFromFunc appends the given function to the Pipeline at the end and returns itself. +func (p *Pipeline) AddStepFromFunc(name string, fn func(ctx context.Context) error) *Pipeline { + return p.AddStep(NewStepFromFunc(name, fn)) +} + // WithSteps appends the given array of steps to the Pipeline at the end and returns itself. func (p *Pipeline) WithSteps(steps ...Step) *Pipeline { p.steps = steps diff --git a/pipeline_test.go b/pipeline_test.go index 573e4eb..1de31ca 100644 --- a/pipeline_test.go +++ b/pipeline_test.go @@ -171,18 +171,17 @@ func TestPipeline_Run(t *testing.T) { } func TestPipeline_RunWithContext_CancelLongRunningStep(t *testing.T) { - p := NewPipeline().WithSteps( - NewStepFromFunc("long running", func(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - // doing nothing - } + p := NewPipeline().AddStepFromFunc("long running", func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + // doing nothing } - }), - ) + } + }) + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) go func() { time.Sleep(5 * time.Millisecond)