forked from pingcap/go-hbase
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaction.go
111 lines (95 loc) · 2.51 KB
/
action.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package hbase
import (
"time"
pb "github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/go-hbase/proto"
)
type action interface {
ToProto() pb.Message
}
func (c *client) innerCall(table, row []byte, action action, useCache bool) (*call, error) {
region, err := c.LocateRegion(table, row, useCache)
if err != nil {
return nil, errors.Trace(err)
}
conn, err := c.getClientConn(region.Server)
if err != nil {
return nil, errors.Trace(err)
}
regionSpecifier := &proto.RegionSpecifier{
Type: proto.RegionSpecifier_REGION_NAME.Enum(),
Value: []byte(region.Name),
}
var cl *call
switch a := action.(type) {
case *Get:
cl = newCall(&proto.GetRequest{
Region: regionSpecifier,
Get: a.ToProto().(*proto.Get),
})
case *Put, *Delete:
cl = newCall(&proto.MutateRequest{
Region: regionSpecifier,
Mutation: a.ToProto().(*proto.MutationProto),
})
case *CoprocessorServiceCall:
cl = newCall(&proto.CoprocessorServiceRequest{
Region: regionSpecifier,
Call: a.ToProto().(*proto.CoprocessorServiceCall),
})
default:
return nil, errors.Errorf("Unknown action - %T - %v", action, action)
}
err = conn.call(cl)
if err != nil {
//释放资源
conn.close()
// If failed, remove bad server conn cache.
cachedKey := cachedConnKey(region.Server, ClientService)
c.mu.Lock()
delete(c.cachedConns, cachedKey)
c.mu.Unlock()
return nil, errors.Trace(err)
}
return cl, nil
}
func (c *client) innerDo(table, row []byte, action action, useCache bool) (pb.Message, error) {
// Try to create and send a new resuqest call.
cl, err := c.innerCall(table, row, action, useCache)
if err != nil {
log.Warnf("inner call failed - %v", errors.ErrorStack(err))
return nil, errors.Trace(err)
}
select {
case rsp := <-cl.responseCh:
return rsp, nil
case <-time.After(Timeout):
return nil, errors.Errorf("innerDo timeout")
}
}
func (c *client) do(table, row []byte, action action, useCache bool) (pb.Message, error) {
var (
result pb.Message
err error
)
LOOP:
for i := 0; i < c.maxRetries; i++ {
result, err = c.innerDo(table, row, action, useCache)
if err == nil {
switch r := result.(type) {
case *exception:
err = errors.New(r.msg)
// If get an execption response, clean old region cache.
c.CleanRegionCache(table)
default:
break LOOP
}
}
useCache = false
log.Warnf("Retrying action for the %d time(s), error - %v", i+1, errors.ErrorStack(err))
retrySleep(i + 1)
}
return result, errors.Trace(err)
}