-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeline.go
150 lines (131 loc) · 4.99 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package pipeline
import (
"context"
"fmt"
)
// Pipeline holds and runs intermediate actions, called "steps".
type Pipeline[T context.Context] struct {
steps []Step[T]
beforeHooks []Listener[T]
finalizer ErrorHandler[T]
options Options
}
// Listener is a simple func that listens to Pipeline events.
type Listener[T context.Context] func(step Step[T])
// ActionFunc is the func that contains your business logic.
type ActionFunc[T context.Context] func(ctx T) error
// ErrorHandler is a func that gets called when a step's ActionFunc has finished with an error.
type ErrorHandler[T context.Context] func(ctx T, err error) error
// NewPipeline returns a new Pipeline instance.
func NewPipeline[T context.Context]() *Pipeline[T] {
return &Pipeline[T]{}
}
// WithBeforeHooks takes a list of listeners.
// Each Listener is called once in the given order just before the ActionFunc is invoked.
// The listeners should return as fast as possible, as they are not intended to do actual business logic.
func (p *Pipeline[T]) WithBeforeHooks(listeners ...Listener[T]) *Pipeline[T] {
p.beforeHooks = listeners
return p
}
// AddStep appends the given step to the Pipeline at the end and returns itself.
func (p *Pipeline[T]) AddStep(step Step[T]) *Pipeline[T] {
p.steps = append(p.steps, step)
return p
}
// AddStepFromFunc appends the given function to the Pipeline at the end and returns itself.
func (p *Pipeline[T]) AddStepFromFunc(name string, fn ActionFunc[T]) *Pipeline[T] {
return p.AddStep(NewStep[T](name, fn))
}
// WithSteps appends the given array of steps to the Pipeline at the end and returns itself.
func (p *Pipeline[T]) WithSteps(steps ...Step[T]) *Pipeline[T] {
p.steps = steps
return p
}
// WithNestedSteps is similar to AsNestedStep, but it accepts the steps given directly as parameters.
// When predicate is non-nil then the steps are only executed if it evaluates to `true`.
func (p *Pipeline[T]) WithNestedSteps(name string, predicate Predicate[T], steps ...Step[T]) Step[T] {
return NewStepIf[T](predicate, name, func(ctx T) error {
nested := &Pipeline[T]{beforeHooks: p.beforeHooks, steps: steps, options: p.options}
return nested.RunWithContext(ctx)
})
}
// AsNestedStep converts the Pipeline instance into a Step that can be used in other pipelines.
// The properties are passed to the nested pipeline.
func (p *Pipeline[T]) AsNestedStep(name string) Step[T] {
return NewStep[T](name, func(ctx T) error {
nested := &Pipeline[T]{beforeHooks: p.beforeHooks, steps: p.steps, options: p.options}
return nested.RunWithContext(ctx)
})
}
// WithFinalizer returns itself while setting the finalizer for the pipeline.
// The finalizer is a handler that gets called after the last step is in the pipeline is completed.
// If a pipeline aborts early or gets canceled then it is also called.
func (p *Pipeline[T]) WithFinalizer(handler ErrorHandler[T]) *Pipeline[T] {
p.finalizer = handler
return p
}
// NewStep is syntactic sugar for NewStep but with T already set.
func (p *Pipeline[T]) NewStep(name string, action ActionFunc[T]) Step[T] {
return NewStep[T](name, action)
}
// When is syntactic sugar for NewStep combined with Step.When.
func (p *Pipeline[T]) When(predicate Predicate[T], name string, action ActionFunc[T]) Step[T] {
return NewStepIf[T](predicate, name, action)
}
// RunWithContext executes the Pipeline.
// Steps are executed sequentially as they were added to the Pipeline.
// Upon cancellation of the context, the pipeline does not terminate a currently running step, instead it skips the remaining steps in the execution order.
// The context is passed to each Step.Action and each Step may need to listen to the context cancellation event to truly cancel a long-running step.
// If the pipeline gets canceled, the context's error is returned.
//
// All non-nil errors, except the error returned from the pipeline's finalizer, are wrapped in Result.
// This can be used to retrieve the metadata of the step that returned the error with errors.As:
// err := p.RunWithContext(ctx)
// var result pipeline.Result
// if errors.As(err, &result) {
// fmt.Println(result.Name())
// }
func (p *Pipeline[T]) RunWithContext(ctx T) error {
result := p.doRun(ctx)
if p.finalizer != nil {
err := p.finalizer(ctx, result)
return err
}
return result
}
func (p *Pipeline[T]) doRun(ctx T) Result {
for _, step := range p.steps {
select {
case <-ctx.Done():
result := p.fail(ctx.Err(), step)
return result
default:
if step.Condition != nil {
skipStep := !step.Condition(ctx)
if skipStep {
continue
}
}
for _, hooks := range p.beforeHooks {
hooks(step)
}
err := step.Action(ctx)
if step.Handler != nil {
err = step.Handler(ctx, err)
}
if err != nil {
return p.fail(err, step)
}
}
}
return nil
}
func (p *Pipeline[T]) fail(err error, step Step[T]) Result {
var resultErr error
if p.options.DisableErrorWrapping {
resultErr = err
} else {
resultErr = fmt.Errorf("step '%s' failed: %w", step.Name, err)
}
return newResult(step.Name, resultErr)
}