forked from wangaoone/redeo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
92 lines (78 loc) · 1.8 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Package client implements a minimalist client
// for working with redis servers.
package client
import (
"net"
"sync"
"github.com/bsm/pool"
"github.com/mason-leap-lab/redeo/resp"
)
// Pool is a minimalist redis client connection pool
type Pool struct {
conns *pool.Pool
readers sync.Pool
writers sync.Pool
}
// New initializes a new pool with a custom dialer
func New(opt *pool.Options, dialer func() (net.Conn, error)) (*Pool, error) {
if dialer == nil {
dialer = func() (net.Conn, error) {
return net.Dial("tcp", "127.0.0.1:6379")
}
}
conns, err := pool.New(opt, dialer)
if err != nil {
return nil, err
}
return &Pool{
conns: conns,
}, nil
}
// Get returns a connection
func (p *Pool) Get() (Conn, error) {
cn, err := p.conns.Get()
if err != nil {
return nil, err
}
return &conn{
Conn: cn,
RequestWriter: p.newRequestWriter(cn),
ResponseReader: p.newResponseReader(cn),
}, nil
}
// Put allows to return a connection back to the pool.
// Call this method after every call/pipeline.
// Do not use the connection again after this method
// is triggered.
func (p *Pool) Put(cn Conn) {
cs, ok := cn.(*conn)
if !ok {
return
} else if cs.failed {
_ = cs.Close()
return
}
p.writers.Put(cs.RequestWriter)
p.readers.Put(cs.ResponseReader)
p.conns.Put(cs.Conn)
}
// Close closes the client and all underlying connections
func (p *Pool) Close() error {
return p.conns.Close()
}
func (p *Pool) newRequestWriter(cn net.Conn) *resp.RequestWriter {
if v := p.writers.Get(); v != nil {
w := v.(*resp.RequestWriter)
w.Reset(cn)
return w
}
return resp.NewRequestWriter(cn)
}
func (p *Pool) newResponseReader(cn net.Conn) resp.ResponseReader {
if v := p.readers.Get(); v != nil {
r := v.(resp.ResponseReader)
r.Reset(cn)
return r
}
return resp.NewResponseReader(cn)
}