Skip to content

Commit

Permalink
Revert "client: Cache tikv request in tidb client side (tikv#1098)"
Browse files Browse the repository at this point in the history
This reverts commit 824302a.

Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
crazycs520 committed May 22, 2024
1 parent 2d514cf commit 4fb9989
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 320 deletions.
8 changes: 2 additions & 6 deletions config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ type TiKVClient struct {
// TTLRefreshedTxnSize controls whether a transaction should update its TTL or not.
TTLRefreshedTxnSize int64 `toml:"ttl-refreshed-txn-size" json:"ttl-refreshed-txn-size"`
ResolveLockLiteThreshold uint64 `toml:"resolve-lock-lite-threshold" json:"resolve-lock-lite-threshold"`
// MaxConcurrencyRequestLimit is the max concurrency number of request to be sent the tikv
// 0 means auto adjust by feedback.
MaxConcurrencyRequestLimit int64 `toml:"max-concurrency-request-limit" json:"max-concurrency-request-limit"`
// EnableReplicaSelectorV2 indicate whether to use the new replica-selector-v2.
// TODO(crazycs520): remove this config after the new replica-selector-v2 is stable.
EnableReplicaSelectorV2 bool `toml:"enable-replica-selector-v2" json:"enable-replica-selector-v2"`
Expand Down Expand Up @@ -174,9 +171,8 @@ func DefaultTiKVClient() TiKVClient {
},
CoprReqTimeout: 60 * time.Second,

ResolveLockLiteThreshold: 16,
MaxConcurrencyRequestLimit: DefMaxConcurrencyRequestLimit,
EnableReplicaSelectorV2: true,
ResolveLockLiteThreshold: 16,
EnableReplicaSelectorV2: true,
}
}

Expand Down
1 change: 1 addition & 0 deletions examples/gcworker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/google/btree v1.1.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
Expand Down
4 changes: 1 addition & 3 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false},
eventListener: eventListener,
}
batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit)
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
}
}
Expand Down Expand Up @@ -686,11 +685,10 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R

// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since
// request to TiDB is not high frequency.
pri := req.GetResourceControlContext().GetOverridePriority()
if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch {
if batchReq := req.ToBatchCommandsRequest(); batchReq != nil {
defer trace.StartRegion(ctx, req.Type.String()).End()
return wrapErrConn(sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout, pri))
return wrapErrConn(sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout))
}
}

Expand Down
136 changes: 36 additions & 100 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ type batchCommandsEntry struct {
// canceled indicated the request is canceled or not.
canceled int32
err error
pri uint64
}

func (b *batchCommandsEntry) isCanceled() bool {
return atomic.LoadInt32(&b.canceled) == 1
}

func (b *batchCommandsEntry) priority() uint64 {
return b.pri
// TODO: implement by the request priority.
func (b *batchCommandsEntry) priority() int {
return 0
}

func (b *batchCommandsEntry) error(err error) {
Expand Down Expand Up @@ -107,58 +107,33 @@ func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) {
b.entries.Push(entry)
}

const highTaskPriority = 10

func (b *batchCommandsBuilder) hasHighPriorityTask() bool {
return b.entries.highestPriority() >= highTaskPriority
}

// buildWithLimit builds BatchCommandsRequests with the given limit.
// the highest priority tasks don't consume any limit,
// so the limit only works for normal tasks.
// build builds BatchCommandsRequests and calls collect() for each valid entry.
// The first return value is the request that doesn't need forwarding.
// The second is a map that maps forwarded hosts to requests.
func (b *batchCommandsBuilder) buildWithLimit(limit int64, collect func(id uint64, e *batchCommandsEntry),
func (b *batchCommandsBuilder) build(
collect func(id uint64, e *batchCommandsEntry),
) (*tikvpb.BatchCommandsRequest, map[string]*tikvpb.BatchCommandsRequest) {
count := int64(0)
build := func(reqs []Item) {
for _, e := range reqs {
e := e.(*batchCommandsEntry)
if e.isCanceled() {
continue
}
if e.priority() < highTaskPriority {
count++
}

if collect != nil {
collect(b.idAlloc, e)
}
if e.forwardedHost == "" {
b.requestIDs = append(b.requestIDs, b.idAlloc)
b.requests = append(b.requests, e.req)
} else {
batchReq, ok := b.forwardingReqs[e.forwardedHost]
if !ok {
batchReq = &tikvpb.BatchCommandsRequest{}
b.forwardingReqs[e.forwardedHost] = batchReq
}
batchReq.RequestIds = append(batchReq.RequestIds, b.idAlloc)
batchReq.Requests = append(batchReq.Requests, e.req)
}
b.idAlloc++
for _, entry := range b.entries.All() {
e := entry.(*batchCommandsEntry)
if e.isCanceled() {
continue
}
}
for (count < limit && b.entries.Len() > 0) || b.hasHighPriorityTask() {
n := limit
if limit == 0 {
n = 1
if collect != nil {
collect(b.idAlloc, e)
}
reqs := b.entries.Take(int(n))
if len(reqs) == 0 {
break
if e.forwardedHost == "" {
b.requestIDs = append(b.requestIDs, b.idAlloc)
b.requests = append(b.requests, e.req)
} else {
batchReq, ok := b.forwardingReqs[e.forwardedHost]
if !ok {
batchReq = &tikvpb.BatchCommandsRequest{}
b.forwardingReqs[e.forwardedHost] = batchReq
}
batchReq.RequestIds = append(batchReq.RequestIds, b.idAlloc)
batchReq.Requests = append(batchReq.Requests, e.req)
}
build(reqs)
b.idAlloc++
}
var req *tikvpb.BatchCommandsRequest
if len(b.requests) > 0 {
Expand All @@ -170,22 +145,20 @@ func (b *batchCommandsBuilder) buildWithLimit(limit int64, collect func(id uint6
return req, b.forwardingReqs
}

// cancel all requests, only used in test.
func (b *batchCommandsBuilder) cancel(e error) {
for _, entry := range b.entries.all() {
for _, entry := range b.entries.All() {
entry.(*batchCommandsEntry).error(e)
}
b.entries.reset()
}

// reset resets the builder to the initial state.
// Should call it before collecting a new batch.
func (b *batchCommandsBuilder) reset() {
b.entries.clean()
// NOTE: We can't simply set entries = entries[:0] here.
// The data in the cap part of the slice would reference the prewrite keys whose
// underlying memory is borrowed from memdb. The reference cause GC can't release
// the memdb, leading to serious memory leak problems in the large transaction case.
b.entries.Reset()
for i := 0; i < len(b.requests); i++ {
b.requests[i] = nil
}
Expand Down Expand Up @@ -363,7 +336,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
a.fetchMorePendingRequests(int(cfg.MaxBatchSize), int(bestBatchWaitSize), cfg.MaxBatchWaitTime)
}
}
a.pendingRequests.Observe(float64(len(a.batchCommandsCh) + a.reqBuilder.len()))
a.pendingRequests.Observe(float64(len(a.batchCommandsCh)))
a.batchSize.Observe(float64(a.reqBuilder.len()))
length := a.reqBuilder.len()
if uint(length) == 0 {
// The batch command channel is closed.
Expand All @@ -380,11 +354,6 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
}
}

const (
SendFailedReasonNoAvailableLimit = "concurrency limit exceeded"
SendFailedReasonTryLockForSendFail = "tryLockForSend fail"
)

func (a *batchConn) getClientAndSend() {
if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil {
if timeout, ok := val.(int); ok && timeout > 0 {
Expand All @@ -397,56 +366,36 @@ func (a *batchConn) getClientAndSend() {
cli *batchCommandsClient
target string
)
reasons := make([]string, 0)
hasHighPriorityTask := a.reqBuilder.hasHighPriorityTask()
for i := 0; i < len(a.batchCommandsClients); i++ {
a.index = (a.index + 1) % uint32(len(a.batchCommandsClients))
target = a.batchCommandsClients[a.index].target
// The lock protects the batchCommandsClient from been closed while it's in use.
c := a.batchCommandsClients[a.index]
if hasHighPriorityTask || c.available() > 0 {
if c.tryLockForSend() {
cli = c
break
} else {
reasons = append(reasons, SendFailedReasonTryLockForSendFail)
}
} else {
reasons = append(reasons, SendFailedReasonNoAvailableLimit)
if a.batchCommandsClients[a.index].tryLockForSend() {
cli = a.batchCommandsClients[a.index]
break
}
}
if cli == nil {
logutil.BgLogger().Info("no available connections", zap.String("target", target), zap.Any("reasons", reasons))
logutil.BgLogger().Warn("no available connections", zap.String("target", target))
metrics.TiKVNoAvailableConnectionCounter.Inc()
if config.GetGlobalConfig().TiKVClient.MaxConcurrencyRequestLimit == config.DefMaxConcurrencyRequestLimit {
// Only cancel requests when MaxConcurrencyRequestLimit feature is not enabled, to be compatible with the behavior of older versions.
// TODO: But when MaxConcurrencyRequestLimit feature is enabled, the requests won't be canceled and will wait until timeout.
// This behavior may not be reasonable, as the timeout is usually 40s or 60s, which is too long to retry in time.
a.reqBuilder.cancel(errors.New("no available connections"))
}
// Please ensure the error is handled in region cache correctly.
a.reqBuilder.cancel(errors.New("no available connections"))
return
}
defer cli.unlockForSend()
available := cli.available()
batch := 0
req, forwardingReqs := a.reqBuilder.buildWithLimit(available, func(id uint64, e *batchCommandsEntry) {

req, forwardingReqs := a.reqBuilder.build(func(id uint64, e *batchCommandsEntry) {
cli.batched.Store(id, e)
cli.sent.Add(1)
if trace.IsEnabled() {
trace.Log(e.ctx, "rpc", "send")
}
})
if req != nil {
batch += len(req.RequestIds)
cli.send("", req)
}
for forwardedHost, req := range forwardingReqs {
batch += len(req.RequestIds)
cli.send(forwardedHost, req)
}
if batch > 0 {
a.batchSize.Observe(float64(batch))
}
}

type tryLock struct {
Expand Down Expand Up @@ -558,11 +507,6 @@ type batchCommandsClient struct {
// tryLock protects client when re-create the streaming.
tryLock

// sent is the number of the requests are processed by tikv server.
sent atomic.Int64
// maxConcurrencyRequestLimit is the max allowed number of requests to be sent the tikv
maxConcurrencyRequestLimit atomic.Int64

// eventListener is the listener set by external code to observe some events in the client. It's stored in a atomic
// pointer to make setting thread-safe.
eventListener *atomic.Pointer[ClientEventListener]
Expand All @@ -572,10 +516,6 @@ func (c *batchCommandsClient) isStopped() bool {
return atomic.LoadInt32(&c.closed) != 0
}

func (c *batchCommandsClient) available() int64 {
return c.maxConcurrencyRequestLimit.Load() - c.sent.Load()
}

func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) {
err := c.initBatchClient(forwardedHost)
if err != nil {
Expand Down Expand Up @@ -641,7 +581,6 @@ func (c *batchCommandsClient) failRequestsByIDs(err error, requestIDs []uint64)

func (c *batchCommandsClient) failRequest(err error, requestID uint64, entry *batchCommandsEntry) {
c.batched.Delete(requestID)
c.sent.Add(-1)
entry.error(err)
}

Expand Down Expand Up @@ -763,7 +702,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
entry.res <- responses[i]
}
c.batched.Delete(requestID)
c.sent.Add(-1)
}

transportLayerLoad := resp.GetTransportLayerLoad()
Expand Down Expand Up @@ -888,7 +826,6 @@ func sendBatchRequest(
batchConn *batchConn,
req *tikvpb.BatchCommandsRequest_Request,
timeout time.Duration,
priority uint64,
) (*tikvrpc.Response, error) {
entry := &batchCommandsEntry{
ctx: ctx,
Expand All @@ -897,7 +834,6 @@ func sendBatchRequest(
forwardedHost: forwardedHost,
canceled: 0,
err: nil,
pri: priority,
}
timer := time.NewTimer(timeout)
defer timer.Stop()
Expand Down
Loading

0 comments on commit 4fb9989

Please sign in to comment.