Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: Cache tikv request in tidb client side #1098

Merged
merged 33 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2d2427c
impl priority queue
bufferflies Dec 28, 2023
d249194
replace priority queue
bufferflies Dec 28, 2023
9a0739e
cache request in tidb side
bufferflies Dec 28, 2023
c68dbeb
fix gosimple
bufferflies Dec 28, 2023
7f3cc02
impl priority
bufferflies Dec 28, 2023
4f42240
Merge branch 'master' into cache_request
bufferflies Jan 3, 2024
8d1fca6
pass ut
bufferflies Jan 3, 2024
9e7283f
Merge branch 'cache_request' of github.com:bufferflies/client-go into…
bufferflies Jan 3, 2024
55e98de
Merge branch 'cache_request' of github.com:bufferflies/client-go into…
bufferflies Jan 3, 2024
e3a4cab
resolve conflict
bufferflies Jan 3, 2024
8e939c9
add comment
bufferflies Jan 3, 2024
cabd454
add
bufferflies Jan 5, 2024
709b464
remove request if the request has been canceled
bufferflies Jan 8, 2024
38d9dfd
remove request if it has been canceled
bufferflies Jan 8, 2024
5766a91
add comment for cancel
bufferflies Jan 8, 2024
3602ee7
not make the loop is busy
bufferflies Jan 15, 2024
2b9969c
Merge branch 'master' into cache_request
bufferflies Jan 16, 2024
66a3525
lint
bufferflies Jan 16, 2024
98824f1
revert busy loop
bufferflies Jan 17, 2024
2d3bbf0
add unit test
bufferflies Jan 18, 2024
1d876f1
not limit ehigh prioirty test
bufferflies Jan 18, 2024
c492f4e
pass lint
bufferflies Jan 18, 2024
e1cca1c
support all
bufferflies Jan 23, 2024
c85b218
Merge branch 'master' into cache_request
bufferflies Jan 24, 2024
d14af95
Merge branch 'master' into cache_request
bufferflies Jan 25, 2024
dbce3da
Merge branch 'master' into cache_request
bufferflies Jan 30, 2024
1ee7caa
add comment
bufferflies Jan 30, 2024
bbe0543
squash
bufferflies Jan 31, 2024
94a0d9c
revert all to All
bufferflies Jan 31, 2024
27aca2a
remove index from entry
bufferflies Jan 31, 2024
a01352e
make fail reasons more clear
bufferflies Feb 2, 2024
f48967c
Merge branch 'master' into cache_request
bufferflies Feb 5, 2024
85f81ab
resolve conflict
bufferflies Feb 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package config

import (
"fmt"
"math"
"time"

"google.golang.org/grpc/encoding/gzip"
Expand Down Expand Up @@ -87,6 +88,9 @@ type TiKVClient struct {
// TTLRefreshedTxnSize controls whether a transaction should update its TTL or not.
TTLRefreshedTxnSize int64 `toml:"ttl-refreshed-txn-size" json:"ttl-refreshed-txn-size"`
ResolveLockLiteThreshold uint64 `toml:"resolve-lock-lite-threshold" json:"resolve-lock-lite-threshold"`
// MaxBatchGetRequestCount is the max concurrency number of request to be sent the tikv
// 0 means auto adjust by feedback .
MaxConcurrencyRequestLimit int64 `toml:"max-concurrency-request-limit" json:"max-concurrency-request-limit"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a public tidb configuration, should it be approved by the PM member according to the current process requirements?

}

// AsyncCommit is the config for the async commit feature. The switch to enable it is a system variable.
Expand Down Expand Up @@ -155,7 +159,8 @@ func DefaultTiKVClient() TiKVClient {
},
CoprReqTimeout: 60 * time.Second,

ResolveLockLiteThreshold: 16,
ResolveLockLiteThreshold: 16,
MaxConcurrencyRequestLimit: math.MaxInt64,
}
}

Expand Down
5 changes: 3 additions & 2 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
dialTimeout: a.dialTimeout,
tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false},
}
batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit)
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
}
}
Expand Down Expand Up @@ -621,9 +622,9 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since
// request to TiDB is not high frequency.
if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch {
if batchReq := req.ToBatchCommandsRequest(); batchReq != nil {
if batchReq, pri := req.ToBatchCommandsRequest(); batchReq != nil {
defer trace.StartRegion(ctx, req.Type.String()).End()
return sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout)
return sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout, pri)
}
}

Expand Down
81 changes: 58 additions & 23 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ type batchCommandsEntry struct {
// canceled indicated the request is canceled or not.
canceled int32
err error
pri uint64
}

func (b *batchCommandsEntry) isCanceled() bool {
return atomic.LoadInt32(&b.canceled) == 1
}

// TODO: implement by the request priority.
func (b *batchCommandsEntry) priority() int {
return 0
func (b *batchCommandsEntry) priority() uint64 {
return b.pri
}

func (b *batchCommandsEntry) error(err error) {
Expand Down Expand Up @@ -107,14 +107,14 @@ func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) {
b.entries.Push(entry)
}

// build builds BatchCommandsRequests and calls collect() for each valid entry.
// buildWithLimit builds BatchCommandsRequests and calls collect() for each valid entry.
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
// The first return value is the request that doesn't need forwarding.
// The second is a map that maps forwarded hosts to requests.
func (b *batchCommandsBuilder) build(
collect func(id uint64, e *batchCommandsEntry),
func (b *batchCommandsBuilder) buildWithLimit(limit int64, collect func(id uint64, e *batchCommandsEntry),
) (*tikvpb.BatchCommandsRequest, map[string]*tikvpb.BatchCommandsRequest) {
for _, entry := range b.entries.All() {
e := entry.(*batchCommandsEntry)
pri, pending := uint64(0), b.entries.Len()
for count, i := int64(0), 0; i < pending; i++ {
e := b.entries.Pop().(*batchCommandsEntry)
if e.isCanceled() {
continue
}
Expand All @@ -133,7 +133,15 @@ func (b *batchCommandsBuilder) build(
batchReq.RequestIds = append(batchReq.RequestIds, b.idAlloc)
batchReq.Requests = append(batchReq.Requests, e.req)
}
if count == 0 {
pri = e.priority()
}
count++
b.idAlloc++
// keep one batch for each priority, don't max different priority request into one batch requests.
if count >= limit || e.priority() != pri {
break
}
}
var req *tikvpb.BatchCommandsRequest
if len(b.requests) > 0 {
Expand All @@ -145,20 +153,22 @@ func (b *batchCommandsBuilder) build(
return req, b.forwardingReqs
}

// cancel all requests, only used in test.
func (b *batchCommandsBuilder) cancel(e error) {
for _, entry := range b.entries.All() {
entry.(*batchCommandsEntry).error(e)
}
b.entries.Reset()
}

// reset resets the builder to the initial state.
// Should call it before collecting a new batch.
func (b *batchCommandsBuilder) reset() {
b.entries.clean()
// NOTE: We can't simply set entries = entries[:0] here.
// The data in the cap part of the slice would reference the prewrite keys whose
// underlying memory is borrowed from memdb. The reference cause GC can't release
// the memdb, leading to serious memory leak problems in the large transaction case.
b.entries.Reset()
for i := 0; i < len(b.requests); i++ {
b.requests[i] = nil
}
Expand Down Expand Up @@ -336,8 +346,7 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
a.fetchMorePendingRequests(int(cfg.MaxBatchSize), int(bestBatchWaitSize), cfg.MaxBatchWaitTime)
}
}
a.pendingRequests.Observe(float64(len(a.batchCommandsCh)))
a.batchSize.Observe(float64(a.reqBuilder.len()))
a.pendingRequests.Observe(float64(len(a.batchCommandsCh) + a.reqBuilder.len()))
length := a.reqBuilder.len()
if uint(length) == 0 {
// The batch command channel is closed.
Expand All @@ -349,12 +358,20 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
bestBatchWaitSize++
}

a.getClientAndSend()
batch := a.getClientAndSend()
if batch != 0 {
a.batchSize.Observe(float64(a.reqBuilder.len()))
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
}
metrics.TiKVBatchSendLatency.Observe(float64(time.Since(start)))
}
}

func (a *batchConn) getClientAndSend() {
const (
SendFailedReasonNoAvailableLimit = "no available limit"
SendFailedReasonTryLockForSendFail = "tryLockForSend fail"
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
)

func (a *batchConn) getClientAndSend() int {
if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil {
if timeout, ok := val.(int); ok && timeout > 0 {
time.Sleep(time.Duration(timeout * int(time.Millisecond)))
Expand All @@ -366,37 +383,47 @@ func (a *batchConn) getClientAndSend() {
cli *batchCommandsClient
target string
)
reason := ""
for i := 0; i < len(a.batchCommandsClients); i++ {
a.index = (a.index + 1) % uint32(len(a.batchCommandsClients))
target = a.batchCommandsClients[a.index].target
// The lock protects the batchCommandsClient from been closed while it's in use.
if a.batchCommandsClients[a.index].tryLockForSend() {
cli = a.batchCommandsClients[a.index]
break
if c := a.batchCommandsClients[a.index]; c.tryLockForSend() {
if c.sent.Load() <= c.maxConcurrencyRequestLimit.Load() {
cli = c
break
} else {
reason = SendFailedReasonNoAvailableLimit
c.unlockForSend()
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
reason = SendFailedReasonTryLockForSendFail
}
}
if cli == nil {
logutil.BgLogger().Warn("no available connections", zap.String("target", target))
logutil.BgLogger().Warn("no available connections", zap.String("target", target), zap.String("reason", reason))
metrics.TiKVNoAvailableConnectionCounter.Inc()

// Please ensure the error is handled in region cache correctly.
a.reqBuilder.cancel(errors.New("no available connections"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change these request are not canceled and retry sending them when new request arrives. So these request will be block if there is no new incoming requests, is it a proper behavior?
Another issue is that if the maxConcurrencyRequestLimit is not very large, it is possible that the request builder can cache a lot of requests when the incoming requsets number are large, it may lead to issues such as OOM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change these request are not canceled and retry sending them when new request arrives. So these request will be block if there is no new incoming requests, is it a proper behavior?

yes, the request maybe block if there are any requests coming if the configuration is small. It will timeout and then retry it again. I will fixed it by notified mechanism.

Another issue is that if the maxConcurrencyRequestLimit is not very large, it is possible that the request builder can cache a lot of requests when the incoming requsets number are large, it may lead to issues such as OOM.
yes, it maybe happen in origin logic, the request object canbe gc after receiving response.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first issue, we may handle it in fetchAllPendingRequests. That is, we can skip waiting for headEntry when entries is not empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It maybe cause busy loop if there's no one client that has sent token. I will optimaze it by using channel to notify the sender to sent requests again.

return
return 0
}
defer cli.unlockForSend()

req, forwardingReqs := a.reqBuilder.build(func(id uint64, e *batchCommandsEntry) {
available := cli.maxConcurrencyRequestLimit.Load() - cli.sent.Load()
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
batch := 0
req, forwardingReqs := a.reqBuilder.buildWithLimit(available, func(id uint64, e *batchCommandsEntry) {
cli.batched.Store(id, e)
cli.sent.Add(1)
if trace.IsEnabled() {
trace.Log(e.ctx, "rpc", "send")
}
})
if req != nil {
batch += len(req.RequestIds)
cli.send("", req)
}
for forwardedHost, req := range forwardingReqs {
batch += len(req.RequestIds)
cli.send(forwardedHost, req)
}
return batch
}

type tryLock struct {
Expand Down Expand Up @@ -507,6 +534,10 @@ type batchCommandsClient struct {
closed int32
// tryLock protects client when re-create the streaming.
tryLock
// sent is the counter of sent requests to tikv but not accept response.
sent atomic.Int64
// limit is the max number of requests can be sent to tikv but not accept response.
maxConcurrencyRequestLimit atomic.Int64
}

func (c *batchCommandsClient) isStopped() bool {
Expand Down Expand Up @@ -549,6 +580,7 @@ func (c *batchCommandsClient) failPendingRequests(err error) {
id, _ := key.(uint64)
entry, _ := value.(*batchCommandsEntry)
c.batched.Delete(id)
c.sent.Add(-1)
entry.error(err)
return true
})
Expand Down Expand Up @@ -661,6 +693,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
entry.res <- responses[i]
}
c.batched.Delete(requestID)
c.sent.Add(-1)
}

transportLayerLoad := resp.GetTransportLayerLoad()
Expand Down Expand Up @@ -779,6 +812,7 @@ func sendBatchRequest(
batchConn *batchConn,
req *tikvpb.BatchCommandsRequest_Request,
timeout time.Duration,
priority uint64,
) (*tikvrpc.Response, error) {
entry := &batchCommandsEntry{
ctx: ctx,
Expand All @@ -787,6 +821,7 @@ func sendBatchRequest(
forwardedHost: forwardedHost,
canceled: 0,
err: nil,
pri: priority,
}
timer := time.NewTimer(timeout)
defer timer.Stop()
Expand Down
40 changes: 26 additions & 14 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ package client
import (
"context"
"fmt"
"math"
"math/rand"
"runtime"
"strconv"
Expand Down Expand Up @@ -111,10 +112,10 @@ func TestCancelTimeoutRetErr(t *testing.T) {

ctx, cancel := context.WithCancel(context.TODO())
cancel()
_, err := sendBatchRequest(ctx, "", "", a, req, 2*time.Second)
_, err := sendBatchRequest(ctx, "", "", a, req, 2*time.Second, 0)
assert.Equal(t, errors.Cause(err), context.Canceled)

_, err = sendBatchRequest(context.Background(), "", "", a, req, 0)
_, err = sendBatchRequest(context.Background(), "", "", a, req, 0, 0)
assert.Equal(t, errors.Cause(err), context.DeadlineExceeded)
}

Expand All @@ -134,8 +135,8 @@ func TestSendWhenReconnect(t *testing.T) {
}

req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
_, err = rpcClient.SendRequest(context.Background(), addr, req, 100*time.Second)
assert.True(t, err.Error() == "no available connections")
_, err = rpcClient.SendRequest(context.Background(), addr, req, 5*time.Second)
assert.True(t, strings.Contains(err.Error(), "timeout"))
server.Stop()
}

Expand Down Expand Up @@ -386,7 +387,7 @@ func TestBatchCommandsBuilder(t *testing.T) {
assert.Equal(t, builder.len(), i+1)
}
entryMap := make(map[uint64]*batchCommandsEntry)
batchedReq, forwardingReqs := builder.build(func(id uint64, e *batchCommandsEntry) {
batchedReq, forwardingReqs := builder.buildWithLimit(math.MaxInt64, func(id uint64, e *batchCommandsEntry) {
entryMap[id] = e
})
assert.Equal(t, len(batchedReq.GetRequests()), 10)
Expand All @@ -412,7 +413,7 @@ func TestBatchCommandsBuilder(t *testing.T) {
}
}
entryMap = make(map[uint64]*batchCommandsEntry)
batchedReq, forwardingReqs = builder.build(func(id uint64, e *batchCommandsEntry) {
batchedReq, forwardingReqs = builder.buildWithLimit(math.MaxInt64, func(id uint64, e *batchCommandsEntry) {
entryMap[id] = e
})
assert.Equal(t, len(batchedReq.GetRequests()), 1)
Expand All @@ -422,8 +423,8 @@ func TestBatchCommandsBuilder(t *testing.T) {
assert.Equal(t, len(forwardingReqs[host].GetRequests()), i+2)
assert.Equal(t, len(forwardingReqs[host].GetRequestIds()), i+2)
}
assert.Equal(t, builder.idAlloc, uint64(10+builder.len()))
assert.Equal(t, len(entryMap), builder.len())
assert.Equal(t, int(builder.idAlloc), 20)
assert.Equal(t, len(entryMap), 10)
for host, forwardingReq := range forwardingReqs {
for i, id := range forwardingReq.GetRequestIds() {
assert.Equal(t, entryMap[id].req, forwardingReq.GetRequests()[i])
Expand All @@ -444,7 +445,7 @@ func TestBatchCommandsBuilder(t *testing.T) {
builder.push(entry)
}
entryMap = make(map[uint64]*batchCommandsEntry)
batchedReq, forwardingReqs = builder.build(func(id uint64, e *batchCommandsEntry) {
batchedReq, forwardingReqs = builder.buildWithLimit(math.MaxInt64, func(id uint64, e *batchCommandsEntry) {
entryMap[id] = e
})
assert.Equal(t, len(batchedReq.GetRequests()), 2)
Expand Down Expand Up @@ -475,7 +476,6 @@ func TestBatchCommandsBuilder(t *testing.T) {
// Test reset
builder.reset()
assert.Equal(t, builder.len(), 0)
assert.Equal(t, builder.entries.Len(), 0)
assert.Equal(t, len(builder.requests), 0)
assert.Equal(t, len(builder.requestIDs), 0)
assert.Equal(t, len(builder.forwardingReqs), 0)
Expand Down Expand Up @@ -665,7 +665,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
assert.Nil(t, err)
// send some request, it should be success.
for i := 0; i < 100; i++ {
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second*20)
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second*20, 0)
require.NoError(t, err)
}

Expand All @@ -674,8 +674,8 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
require.False(t, server.IsRunning())

// send some request, it should be failed since server is down.
for i := 0; i < 200; i++ {
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second*20)
for i := 0; i < 10; i++ {
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Millisecond*100, 0)
require.Error(t, err)
time.Sleep(time.Millisecond * time.Duration(rand.Intn(300)))
grpcConn := conn.Get()
Expand Down Expand Up @@ -718,7 +718,19 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {

// send some request, it should be success again.
for i := 0; i < 100; i++ {
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second*20)
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second*20, 0)
require.NoError(t, err)
}
}

func TestLimitConcurrency(t *testing.T) {
re := require.New(t)
batch := newBatchConn(1, 128, nil)
for i := 0; i < 100; i++ {
batch.reqBuilder.push(&batchCommandsEntry{req: &tikvpb.BatchCommandsRequest_Request{}})
}
re.Equal(100, batch.reqBuilder.len())
req, _ := batch.reqBuilder.buildWithLimit(1, func(_ uint64, _ *batchCommandsEntry) {})
re.Len(req.RequestIds, 1)
re.Equal(99, batch.reqBuilder.len())
}
Loading
Loading