-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathmiddleware.go
179 lines (154 loc) · 5.16 KB
/
middleware.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package loomchain
import (
"encoding/base64"
"errors"
"fmt"
"runtime/debug"
"time"
"github.com/go-kit/kit/metrics"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/loomnetwork/loomchain/log"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
type TxMiddleware interface {
ProcessTx(state State, txBytes []byte, next TxHandlerFunc, isCheckTx bool) (TxHandlerResult, error)
}
type TxMiddlewareFunc func(state State, txBytes []byte, next TxHandlerFunc, isCheckTx bool) (TxHandlerResult, error)
func (f TxMiddlewareFunc) ProcessTx(
state State, txBytes []byte, next TxHandlerFunc, isCheckTx bool,
) (TxHandlerResult, error) {
return f(state, txBytes, next, isCheckTx)
}
type PostCommitHandler func(state State, txBytes []byte, res TxHandlerResult, isCheckTx bool) error
type PostCommitMiddleware interface {
ProcessTx(state State, txBytes []byte, res TxHandlerResult, next PostCommitHandler, isCheckTx bool) error
}
type PostCommitMiddlewareFunc func(
state State, txBytes []byte, res TxHandlerResult, next PostCommitHandler, isCheckTx bool,
) error
func (f PostCommitMiddlewareFunc) ProcessTx(
state State, txBytes []byte, res TxHandlerResult, next PostCommitHandler, isCheckTx bool,
) error {
return f(state, txBytes, res, next, isCheckTx)
}
func MiddlewareTxHandler(
middlewares []TxMiddleware,
handler TxHandler,
postMiddlewares []PostCommitMiddleware,
) TxHandler {
postChain := func(state State, txBytes []byte, res TxHandlerResult, isCheckTx bool) error { return nil }
for i := len(postMiddlewares) - 1; i >= 0; i-- {
m := postMiddlewares[i]
localNext := postChain
postChain = func(state State, txBytes []byte, res TxHandlerResult, isCheckTx bool) error {
return m.ProcessTx(state, txBytes, res, localNext, isCheckTx)
}
}
next := TxHandlerFunc(func(state State, txBytes []byte, isCheckTx bool) (TxHandlerResult, error) {
result, err := handler.ProcessTx(state, txBytes, isCheckTx)
if err != nil {
return result, err
}
err = postChain(state, txBytes, result, isCheckTx)
return result, err
})
for i := len(middlewares) - 1; i >= 0; i-- {
m := middlewares[i]
// Need local var otherwise infinite loop occurs
nextLocal := next
next = func(state State, txBytes []byte, isCheckTx bool) (TxHandlerResult, error) {
return m.ProcessTx(state, txBytes, nextLocal, isCheckTx)
}
}
return next
}
var NoopTxHandler = TxHandlerFunc(func(state State, txBytes []byte, isCheckTx bool) (TxHandlerResult, error) {
return TxHandlerResult{}, nil
})
func rvalError(r interface{}) error {
var err error
switch x := r.(type) {
case string:
err = errors.New(x)
case error:
err = x
default:
err = errors.New("unknown panic")
}
return err
}
var RecoveryTxMiddleware = TxMiddlewareFunc(func(
state State,
txBytes []byte,
next TxHandlerFunc,
isCheckTx bool,
) (res TxHandlerResult, err error) {
defer func() {
if rval := recover(); rval != nil {
logger := log.Default
logger.Error("Panic in TX Handler", "rvalue", rval)
println(debug.Stack())
err = rvalError(rval)
}
}()
return next(state, txBytes, isCheckTx)
})
var LogTxMiddleware = TxMiddlewareFunc(func(
state State,
txBytes []byte,
next TxHandlerFunc,
isCheckTx bool,
) (TxHandlerResult, error) {
// TODO: We should set some transaction specific logging information
return next(state, txBytes, isCheckTx)
})
var LogPostCommitMiddleware = PostCommitMiddlewareFunc(func(
state State,
txBytes []byte,
res TxHandlerResult,
next PostCommitHandler,
isCheckTx bool,
) error {
log.Default.Info("Tx processed", "result", res, "payload", base64.StdEncoding.EncodeToString(txBytes))
return next(state, txBytes, res, isCheckTx)
})
// InstrumentingTxMiddleware maintains the state of metrics values internally
type InstrumentingTxMiddleware struct {
requestCount metrics.Counter
requestLatency metrics.Histogram
}
var _ TxMiddleware = &InstrumentingTxMiddleware{}
// NewInstrumentingTxMiddleware initializes the metrics and maintains the handler func
func NewInstrumentingTxMiddleware() TxMiddleware {
// initialize metrics
fieldKeys := []string{"method", "error"}
requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "loomchain",
Subsystem: "tx_service",
Name: "request_count",
Help: "Number of requests received.",
}, fieldKeys)
requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "loomchain",
Subsystem: "tx_service",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, fieldKeys)
return &InstrumentingTxMiddleware{
requestCount: requestCount,
requestLatency: requestLatency,
}
}
// ProcessTx captures metrics and implements TxMiddleware
func (m InstrumentingTxMiddleware) ProcessTx(
state State, txBytes []byte, next TxHandlerFunc, isCheckTx bool,
) (r TxHandlerResult, err error) {
defer func(begin time.Time) {
lvs := []string{"method", "Tx", "error", fmt.Sprint(err != nil)}
m.requestCount.With(lvs...).Add(1)
m.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds())
}(time.Now())
r, err = next(state, txBytes, isCheckTx)
return
}