forked from redis/go-redis
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathexample_instrumentation_test.go
59 lines (50 loc) · 1.17 KB
/
example_instrumentation_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package redis_test
import (
"fmt"
"sync/atomic"
"time"
"github.com/go-redis/redis"
)
func Example_instrumentation() {
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"shard1": ":6379",
},
})
ring.ForEachShard(func(client *redis.Client) error {
wrapRedisProcess(client)
return nil
})
for {
ring.Ping()
}
}
func wrapRedisProcess(client *redis.Client) {
const precision = time.Microsecond
var count, avgDur uint32
go func() {
for range time.Tick(3 * time.Second) {
n := atomic.LoadUint32(&count)
dur := time.Duration(atomic.LoadUint32(&avgDur)) * precision
fmt.Printf("%s: processed=%d avg_dur=%s\n", client, n, dur)
}
}()
client.WrapProcess(func(oldProcess func(redis.Cmder) error) func(redis.Cmder) error {
return func(cmd redis.Cmder) error {
start := time.Now()
err := oldProcess(cmd)
dur := time.Since(start)
const decay = float64(1) / 100
ms := float64(dur / precision)
for {
avg := atomic.LoadUint32(&avgDur)
newAvg := uint32((1-decay)*float64(avg) + decay*ms)
if atomic.CompareAndSwapUint32(&avgDur, avg, newAvg) {
break
}
}
atomic.AddUint32(&count, 1)
return err
}
})
}