From 4fb9989cc0babb5a01e500d80804ebabd328a3c9 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 22 May 2024 20:38:00 +0800 Subject: [PATCH] Revert "client: Cache tikv request in tidb client side (#1098)" This reverts commit 824302acd0d8ffe847c4fe3723cba714aafb1023. Signed-off-by: crazycs520 --- config/client.go | 8 +- examples/gcworker/go.mod | 1 + internal/client/client.go | 4 +- internal/client/client_batch.go | 136 +++++++------------------ internal/client/client_test.go | 130 ++++------------------- internal/client/priority_queue.go | 82 ++++----------- internal/client/priority_queue_test.go | 58 ++++------- 7 files changed, 99 insertions(+), 320 deletions(-) diff --git a/config/client.go b/config/client.go index b65d55ebe6..32214e1087 100644 --- a/config/client.go +++ b/config/client.go @@ -97,9 +97,6 @@ type TiKVClient struct { // TTLRefreshedTxnSize controls whether a transaction should update its TTL or not. TTLRefreshedTxnSize int64 `toml:"ttl-refreshed-txn-size" json:"ttl-refreshed-txn-size"` ResolveLockLiteThreshold uint64 `toml:"resolve-lock-lite-threshold" json:"resolve-lock-lite-threshold"` - // MaxConcurrencyRequestLimit is the max concurrency number of request to be sent the tikv - // 0 means auto adjust by feedback. - MaxConcurrencyRequestLimit int64 `toml:"max-concurrency-request-limit" json:"max-concurrency-request-limit"` // EnableReplicaSelectorV2 indicate whether to use the new replica-selector-v2. // TODO(crazycs520): remove this config after the new replica-selector-v2 is stable. EnableReplicaSelectorV2 bool `toml:"enable-replica-selector-v2" json:"enable-replica-selector-v2"` @@ -174,9 +171,8 @@ func DefaultTiKVClient() TiKVClient { }, CoprReqTimeout: 60 * time.Second, - ResolveLockLiteThreshold: 16, - MaxConcurrencyRequestLimit: DefMaxConcurrencyRequestLimit, - EnableReplicaSelectorV2: true, + ResolveLockLiteThreshold: 16, + EnableReplicaSelectorV2: true, } } diff --git a/examples/gcworker/go.mod b/examples/gcworker/go.mod index 2159df8033..c255fde50a 100644 --- a/examples/gcworker/go.mod +++ b/examples/gcworker/go.mod @@ -18,6 +18,7 @@ require ( github.com/google/btree v1.1.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect diff --git a/internal/client/client.go b/internal/client/client.go index 31d837b576..0efa1183d6 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -365,7 +365,6 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false}, eventListener: eventListener, } - batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit) a.batchCommandsClients = append(a.batchCommandsClients, batchClient) } } @@ -686,11 +685,10 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R // TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since // request to TiDB is not high frequency. - pri := req.GetResourceControlContext().GetOverridePriority() if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch { if batchReq := req.ToBatchCommandsRequest(); batchReq != nil { defer trace.StartRegion(ctx, req.Type.String()).End() - return wrapErrConn(sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout, pri)) + return wrapErrConn(sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout)) } } diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 32e8a4e2ba..b5eb7641e3 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -70,15 +70,15 @@ type batchCommandsEntry struct { // canceled indicated the request is canceled or not. canceled int32 err error - pri uint64 } func (b *batchCommandsEntry) isCanceled() bool { return atomic.LoadInt32(&b.canceled) == 1 } -func (b *batchCommandsEntry) priority() uint64 { - return b.pri +// TODO: implement by the request priority. +func (b *batchCommandsEntry) priority() int { + return 0 } func (b *batchCommandsEntry) error(err error) { @@ -107,58 +107,33 @@ func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) { b.entries.Push(entry) } -const highTaskPriority = 10 - -func (b *batchCommandsBuilder) hasHighPriorityTask() bool { - return b.entries.highestPriority() >= highTaskPriority -} - -// buildWithLimit builds BatchCommandsRequests with the given limit. -// the highest priority tasks don't consume any limit, -// so the limit only works for normal tasks. +// build builds BatchCommandsRequests and calls collect() for each valid entry. // The first return value is the request that doesn't need forwarding. // The second is a map that maps forwarded hosts to requests. -func (b *batchCommandsBuilder) buildWithLimit(limit int64, collect func(id uint64, e *batchCommandsEntry), +func (b *batchCommandsBuilder) build( + collect func(id uint64, e *batchCommandsEntry), ) (*tikvpb.BatchCommandsRequest, map[string]*tikvpb.BatchCommandsRequest) { - count := int64(0) - build := func(reqs []Item) { - for _, e := range reqs { - e := e.(*batchCommandsEntry) - if e.isCanceled() { - continue - } - if e.priority() < highTaskPriority { - count++ - } - - if collect != nil { - collect(b.idAlloc, e) - } - if e.forwardedHost == "" { - b.requestIDs = append(b.requestIDs, b.idAlloc) - b.requests = append(b.requests, e.req) - } else { - batchReq, ok := b.forwardingReqs[e.forwardedHost] - if !ok { - batchReq = &tikvpb.BatchCommandsRequest{} - b.forwardingReqs[e.forwardedHost] = batchReq - } - batchReq.RequestIds = append(batchReq.RequestIds, b.idAlloc) - batchReq.Requests = append(batchReq.Requests, e.req) - } - b.idAlloc++ + for _, entry := range b.entries.All() { + e := entry.(*batchCommandsEntry) + if e.isCanceled() { + continue } - } - for (count < limit && b.entries.Len() > 0) || b.hasHighPriorityTask() { - n := limit - if limit == 0 { - n = 1 + if collect != nil { + collect(b.idAlloc, e) } - reqs := b.entries.Take(int(n)) - if len(reqs) == 0 { - break + if e.forwardedHost == "" { + b.requestIDs = append(b.requestIDs, b.idAlloc) + b.requests = append(b.requests, e.req) + } else { + batchReq, ok := b.forwardingReqs[e.forwardedHost] + if !ok { + batchReq = &tikvpb.BatchCommandsRequest{} + b.forwardingReqs[e.forwardedHost] = batchReq + } + batchReq.RequestIds = append(batchReq.RequestIds, b.idAlloc) + batchReq.Requests = append(batchReq.Requests, e.req) } - build(reqs) + b.idAlloc++ } var req *tikvpb.BatchCommandsRequest if len(b.requests) > 0 { @@ -170,22 +145,20 @@ func (b *batchCommandsBuilder) buildWithLimit(limit int64, collect func(id uint6 return req, b.forwardingReqs } -// cancel all requests, only used in test. func (b *batchCommandsBuilder) cancel(e error) { - for _, entry := range b.entries.all() { + for _, entry := range b.entries.All() { entry.(*batchCommandsEntry).error(e) } - b.entries.reset() } // reset resets the builder to the initial state. // Should call it before collecting a new batch. func (b *batchCommandsBuilder) reset() { - b.entries.clean() // NOTE: We can't simply set entries = entries[:0] here. // The data in the cap part of the slice would reference the prewrite keys whose // underlying memory is borrowed from memdb. The reference cause GC can't release // the memdb, leading to serious memory leak problems in the large transaction case. + b.entries.Reset() for i := 0; i < len(b.requests); i++ { b.requests[i] = nil } @@ -363,7 +336,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { a.fetchMorePendingRequests(int(cfg.MaxBatchSize), int(bestBatchWaitSize), cfg.MaxBatchWaitTime) } } - a.pendingRequests.Observe(float64(len(a.batchCommandsCh) + a.reqBuilder.len())) + a.pendingRequests.Observe(float64(len(a.batchCommandsCh))) + a.batchSize.Observe(float64(a.reqBuilder.len())) length := a.reqBuilder.len() if uint(length) == 0 { // The batch command channel is closed. @@ -380,11 +354,6 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { } } -const ( - SendFailedReasonNoAvailableLimit = "concurrency limit exceeded" - SendFailedReasonTryLockForSendFail = "tryLockForSend fail" -) - func (a *batchConn) getClientAndSend() { if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil { if timeout, ok := val.(int); ok && timeout > 0 { @@ -397,56 +366,36 @@ func (a *batchConn) getClientAndSend() { cli *batchCommandsClient target string ) - reasons := make([]string, 0) - hasHighPriorityTask := a.reqBuilder.hasHighPriorityTask() for i := 0; i < len(a.batchCommandsClients); i++ { a.index = (a.index + 1) % uint32(len(a.batchCommandsClients)) target = a.batchCommandsClients[a.index].target // The lock protects the batchCommandsClient from been closed while it's in use. - c := a.batchCommandsClients[a.index] - if hasHighPriorityTask || c.available() > 0 { - if c.tryLockForSend() { - cli = c - break - } else { - reasons = append(reasons, SendFailedReasonTryLockForSendFail) - } - } else { - reasons = append(reasons, SendFailedReasonNoAvailableLimit) + if a.batchCommandsClients[a.index].tryLockForSend() { + cli = a.batchCommandsClients[a.index] + break } } if cli == nil { - logutil.BgLogger().Info("no available connections", zap.String("target", target), zap.Any("reasons", reasons)) + logutil.BgLogger().Warn("no available connections", zap.String("target", target)) metrics.TiKVNoAvailableConnectionCounter.Inc() - if config.GetGlobalConfig().TiKVClient.MaxConcurrencyRequestLimit == config.DefMaxConcurrencyRequestLimit { - // Only cancel requests when MaxConcurrencyRequestLimit feature is not enabled, to be compatible with the behavior of older versions. - // TODO: But when MaxConcurrencyRequestLimit feature is enabled, the requests won't be canceled and will wait until timeout. - // This behavior may not be reasonable, as the timeout is usually 40s or 60s, which is too long to retry in time. - a.reqBuilder.cancel(errors.New("no available connections")) - } + // Please ensure the error is handled in region cache correctly. + a.reqBuilder.cancel(errors.New("no available connections")) return } defer cli.unlockForSend() - available := cli.available() - batch := 0 - req, forwardingReqs := a.reqBuilder.buildWithLimit(available, func(id uint64, e *batchCommandsEntry) { + + req, forwardingReqs := a.reqBuilder.build(func(id uint64, e *batchCommandsEntry) { cli.batched.Store(id, e) - cli.sent.Add(1) if trace.IsEnabled() { trace.Log(e.ctx, "rpc", "send") } }) if req != nil { - batch += len(req.RequestIds) cli.send("", req) } for forwardedHost, req := range forwardingReqs { - batch += len(req.RequestIds) cli.send(forwardedHost, req) } - if batch > 0 { - a.batchSize.Observe(float64(batch)) - } } type tryLock struct { @@ -558,11 +507,6 @@ type batchCommandsClient struct { // tryLock protects client when re-create the streaming. tryLock - // sent is the number of the requests are processed by tikv server. - sent atomic.Int64 - // maxConcurrencyRequestLimit is the max allowed number of requests to be sent the tikv - maxConcurrencyRequestLimit atomic.Int64 - // eventListener is the listener set by external code to observe some events in the client. It's stored in a atomic // pointer to make setting thread-safe. eventListener *atomic.Pointer[ClientEventListener] @@ -572,10 +516,6 @@ func (c *batchCommandsClient) isStopped() bool { return atomic.LoadInt32(&c.closed) != 0 } -func (c *batchCommandsClient) available() int64 { - return c.maxConcurrencyRequestLimit.Load() - c.sent.Load() -} - func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) { err := c.initBatchClient(forwardedHost) if err != nil { @@ -641,7 +581,6 @@ func (c *batchCommandsClient) failRequestsByIDs(err error, requestIDs []uint64) func (c *batchCommandsClient) failRequest(err error, requestID uint64, entry *batchCommandsEntry) { c.batched.Delete(requestID) - c.sent.Add(-1) entry.error(err) } @@ -763,7 +702,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport entry.res <- responses[i] } c.batched.Delete(requestID) - c.sent.Add(-1) } transportLayerLoad := resp.GetTransportLayerLoad() @@ -888,7 +826,6 @@ func sendBatchRequest( batchConn *batchConn, req *tikvpb.BatchCommandsRequest_Request, timeout time.Duration, - priority uint64, ) (*tikvrpc.Response, error) { entry := &batchCommandsEntry{ ctx: ctx, @@ -897,7 +834,6 @@ func sendBatchRequest( forwardedHost: forwardedHost, canceled: 0, err: nil, - pri: priority, } timer := time.NewTimer(timeout) defer timer.Stop() diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 1fe5f7eb7e..e6928dd1e6 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -37,7 +37,6 @@ package client import ( "context" "fmt" - "math" "math/rand" "runtime" "strconv" @@ -118,24 +117,19 @@ func TestCancelTimeoutRetErr(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) cancel() - _, err := sendBatchRequest(ctx, "", "", a, req, 2*time.Second, 0) + _, err := sendBatchRequest(ctx, "", "", a, req, 2*time.Second) assert.Equal(t, errors.Cause(err), context.Canceled) - _, err = sendBatchRequest(context.Background(), "", "", a, req, 0, 0) + _, err = sendBatchRequest(context.Background(), "", "", a, req, 0) assert.Equal(t, errors.Cause(err), context.DeadlineExceeded) } func TestSendWhenReconnect(t *testing.T) { server, port := mockserver.StartMockTikvService() require.True(t, port > 0) - restoreFn := config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.MaxConcurrencyRequestLimit = 10000 - }) - rpcClient := NewRPCClient() defer func() { rpcClient.Close() - restoreFn() }() addr := server.Addr() conn, err := rpcClient.getConnArray(addr, true) @@ -147,8 +141,8 @@ func TestSendWhenReconnect(t *testing.T) { } req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{}) - _, err = rpcClient.SendRequest(context.Background(), addr, req, 5*time.Second) - require.Regexp(t, "wait recvLoop timeout, timeout:5s, wait_send_duration:.*, wait_recv_duration:.*: context deadline exceeded", err.Error()) + _, err = rpcClient.SendRequest(context.Background(), addr, req, 100*time.Second) + assert.Contains(t, err.Error(), "no available connections") server.Stop() } @@ -401,7 +395,7 @@ func TestBatchCommandsBuilder(t *testing.T) { assert.Equal(t, builder.len(), i+1) } entryMap := make(map[uint64]*batchCommandsEntry) - batchedReq, forwardingReqs := builder.buildWithLimit(math.MaxInt64, func(id uint64, e *batchCommandsEntry) { + batchedReq, forwardingReqs := builder.build(func(id uint64, e *batchCommandsEntry) { entryMap[id] = e }) assert.Equal(t, len(batchedReq.GetRequests()), 10) @@ -427,7 +421,7 @@ func TestBatchCommandsBuilder(t *testing.T) { } } entryMap = make(map[uint64]*batchCommandsEntry) - batchedReq, forwardingReqs = builder.buildWithLimit(math.MaxInt64, func(id uint64, e *batchCommandsEntry) { + batchedReq, forwardingReqs = builder.build(func(id uint64, e *batchCommandsEntry) { entryMap[id] = e }) assert.Equal(t, len(batchedReq.GetRequests()), 1) @@ -437,8 +431,8 @@ func TestBatchCommandsBuilder(t *testing.T) { assert.Equal(t, len(forwardingReqs[host].GetRequests()), i+2) assert.Equal(t, len(forwardingReqs[host].GetRequestIds()), i+2) } - assert.Equal(t, int(builder.idAlloc), 20) - assert.Equal(t, len(entryMap), 10) + assert.Equal(t, builder.idAlloc, uint64(10+builder.len())) + assert.Equal(t, len(entryMap), builder.len()) for host, forwardingReq := range forwardingReqs { for i, id := range forwardingReq.GetRequestIds() { assert.Equal(t, entryMap[id].req, forwardingReq.GetRequests()[i]) @@ -459,7 +453,7 @@ func TestBatchCommandsBuilder(t *testing.T) { builder.push(entry) } entryMap = make(map[uint64]*batchCommandsEntry) - batchedReq, forwardingReqs = builder.buildWithLimit(math.MaxInt64, func(id uint64, e *batchCommandsEntry) { + batchedReq, forwardingReqs = builder.build(func(id uint64, e *batchCommandsEntry) { entryMap[id] = e }) assert.Equal(t, len(batchedReq.GetRequests()), 2) @@ -490,6 +484,7 @@ func TestBatchCommandsBuilder(t *testing.T) { // Test reset builder.reset() assert.Equal(t, builder.len(), 0) + assert.Equal(t, builder.entries.Len(), 0) assert.Equal(t, len(builder.requests), 0) assert.Equal(t, len(builder.requestIDs), 0) assert.Equal(t, len(builder.forwardingReqs), 0) @@ -659,6 +654,10 @@ func TestTraceExecDetails(t *testing.T) { } func TestBatchClientRecoverAfterServerRestart(t *testing.T) { + config.UpdateGlobal(func(conf *config.Config) { + conf.TiKVClient.MaxBatchSize = 128 + })() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) require.True(t, server.IsRunning()) @@ -675,7 +674,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { assert.Nil(t, err) // send some request, it should be success. for i := 0; i < 100; i++ { - _, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second*20, 0) + _, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second*20) require.NoError(t, err) } @@ -684,8 +683,8 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { require.False(t, server.IsRunning()) // send some request, it should be failed since server is down. - for i := 0; i < 10; i++ { - _, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Millisecond*100, 0) + for i := 0; i < 200; i++ { + _, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second*20) require.Error(t, err) time.Sleep(time.Millisecond * time.Duration(rand.Intn(300))) grpcConn := conn.Get() @@ -728,100 +727,11 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { // send some request, it should be success again. for i := 0; i < 100; i++ { - _, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second*20, 0) + _, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second*20) require.NoError(t, err) } } -func TestLimitConcurrency(t *testing.T) { - re := require.New(t) - batch := newBatchConn(1, 128, nil) - { - batch.reqBuilder.push(&batchCommandsEntry{req: &tikvpb.BatchCommandsRequest_Request{}}) - reqs, _ := batch.reqBuilder.buildWithLimit(1, func(_ uint64, _ *batchCommandsEntry) {}) - re.Len(reqs.RequestIds, 1) - re.Equal(0, batch.reqBuilder.len()) - batch.reqBuilder.reset() - } - - // highest priority task will be sent immediately, not limited by concurrency - { - batch.reqBuilder.push(&batchCommandsEntry{req: &tikvpb.BatchCommandsRequest_Request{}, pri: highTaskPriority}) - batch.reqBuilder.push(&batchCommandsEntry{req: &tikvpb.BatchCommandsRequest_Request{}, pri: highTaskPriority - 1}) - reqs, _ := batch.reqBuilder.buildWithLimit(0, func(_ uint64, _ *batchCommandsEntry) {}) - re.Len(reqs.RequestIds, 1) - batch.reqBuilder.reset() - re.Equal(1, batch.reqBuilder.len()) - } - - // medium priority tasks are limited by concurrency - { - batch.reqBuilder.push(&batchCommandsEntry{req: &tikvpb.BatchCommandsRequest_Request{}}) - batch.reqBuilder.push(&batchCommandsEntry{req: &tikvpb.BatchCommandsRequest_Request{}}) - reqs, _ := batch.reqBuilder.buildWithLimit(2, func(_ uint64, _ *batchCommandsEntry) {}) - re.Len(reqs.RequestIds, 2) - re.Equal(1, batch.reqBuilder.len()) - batch.reqBuilder.reset() - } - - // the expired tasks should be removed from the queue. - { - batch.reqBuilder.push(&batchCommandsEntry{req: &tikvpb.BatchCommandsRequest_Request{}, canceled: 1}) - batch.reqBuilder.push(&batchCommandsEntry{req: &tikvpb.BatchCommandsRequest_Request{}, canceled: 1}) - batch.reqBuilder.reset() - re.Equal(1, batch.reqBuilder.len()) - } - -} - -func TestPrioritySentLimit(t *testing.T) { - re := require.New(t) - restoreFn := config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.MaxConcurrencyRequestLimit = 2 - conf.TiKVClient.GrpcConnectionCount = 1 - }) - defer restoreFn() - - server, port := mockserver.StartMockTikvService() - re.Greater(port, 0) - rpcClient := NewRPCClient() - defer rpcClient.Close() - addr := server.Addr() - wait := sync.WaitGroup{} - bench := 10 - wait.Add(bench * 2) - ctx, cancelFn := context.WithTimeout(context.Background(), time.Second*10) - defer cancelFn() - sendFn := func(pri uint64, dur *atomic.Int64, qps *atomic.Int64) { - for i := 0; i < bench; i++ { - go func() { - for { - req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{}) - req.ResourceControlContext = &kvrpcpb.ResourceControlContext{OverridePriority: pri} - now := time.Now() - rpcClient.SendRequest(context.Background(), addr, req, 100*time.Millisecond) - dur.Add(time.Since(now).Microseconds()) - qps.Add(1) - select { - case <-ctx.Done(): - wait.Done() - return - default: - } - } - }() - } - } - - highDur, mediumDur := atomic.Int64{}, atomic.Int64{} - highQps, mediumQps := atomic.Int64{}, atomic.Int64{} - go sendFn(16, &highDur, &highQps) - go sendFn(8, &mediumDur, &mediumQps) - wait.Wait() - re.Less(highDur.Load()/highQps.Load()*2, mediumDur.Load()/mediumQps.Load()) - server.Stop() -} - type testClientEventListener struct { healthFeedbackCh chan *tikvpb.HealthFeedback } @@ -952,7 +862,7 @@ func TestFastFailWhenNoAvailableConn(t *testing.T) { req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}} conn, err := client.getConnArray(addr, true) assert.Nil(t, err) - _, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second, 0) + _, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second) require.NoError(t, err) for _, c := range conn.batchConn.batchCommandsClients { @@ -961,7 +871,7 @@ func TestFastFailWhenNoAvailableConn(t *testing.T) { } start := time.Now() timeout := time.Second - _, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, timeout, 0) + _, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, timeout) require.Error(t, err) require.Equal(t, "no available connections", err.Error()) require.Less(t, time.Since(start), timeout) diff --git a/internal/client/priority_queue.go b/internal/client/priority_queue.go index 4622b19614..f70afa106a 100644 --- a/internal/client/priority_queue.go +++ b/internal/client/priority_queue.go @@ -18,18 +18,17 @@ import "container/heap" // Item is the interface that all entries in a priority queue must implement. type Item interface { - priority() uint64 - // isCanceled returns true if the item is canceled by the caller. - isCanceled() bool + priority() int } // entry is an entry in a priority queue. -//type entry struct { -// entry Item -//} +type entry struct { + entry Item + index int +} // prioritySlice implements heap.Interface and holds Entries. -type prioritySlice []Item +type prioritySlice []entry // Len returns the length of the priority queue. func (ps prioritySlice) Len() int { @@ -39,17 +38,20 @@ func (ps prioritySlice) Len() int { // Less compares two entries in the priority queue. // The higher priority entry is the one with the lower value. func (ps prioritySlice) Less(i, j int) bool { - return ps[i].priority() > ps[j].priority() + return ps[i].entry.priority() > ps[j].entry.priority() } // Swap swaps two entries in the priority queue. func (ps prioritySlice) Swap(i, j int) { ps[i], ps[j] = ps[j], ps[i] + ps[i].index = i + ps[j].index = j } // Push adds an entry to the priority queue. func (ps *prioritySlice) Push(x interface{}) { - item := x.(Item) + item := x.(entry) + item.index = len(*ps) *ps = append(*ps, item) } @@ -58,6 +60,7 @@ func (ps *prioritySlice) Pop() interface{} { old := *ps n := len(old) item := old[n-1] + item.index = -1 *ps = old[0 : n-1] return item } @@ -79,68 +82,27 @@ func (pq *PriorityQueue) Len() int { // Push adds an entry to the priority queue. func (pq *PriorityQueue) Push(item Item) { - heap.Push(&pq.ps, item) -} - -// pop removes the highest priority entry from the priority queue. -func (pq *PriorityQueue) pop() Item { - e := heap.Pop(&pq.ps) - if e == nil { - return nil - } - return e.(Item) -} - -// Take returns the highest priority entries from the priority queue. -func (pq *PriorityQueue) Take(n int) []Item { - if n <= 0 { - return nil - } - if n >= pq.Len() { - ret := pq.ps - pq.ps = pq.ps[:0] - return ret - } else { - ret := make([]Item, n) - for i := 0; i < n; i++ { - ret[i] = pq.pop() - } - return ret - } - + heap.Push(&pq.ps, entry{entry: item}) } -func (pq *PriorityQueue) highestPriority() uint64 { - if pq.Len() == 0 { - return 0 - } - return pq.ps[0].priority() +// Pop removes the highest priority entry from the priority queue. +func (pq *PriorityQueue) Pop() Item { + return heap.Pop(&pq.ps).(entry).entry } -// all returns all entries in the priority queue not ensure the priority. -func (pq *PriorityQueue) all() []Item { +// All returns all entries in the priority queue not ensure the priority. +func (pq *PriorityQueue) All() []Item { items := make([]Item, 0, pq.Len()) for i := 0; i < pq.Len(); i++ { - items = append(items, pq.ps[i]) + items = append(items, pq.ps[i].entry) } return items } -// clean removes all canceled entries from the priority queue. -func (pq *PriorityQueue) clean() { - for i := 0; i < pq.Len(); { - if pq.ps[i].isCanceled() { - heap.Remove(&pq.ps, i) - continue - } - i++ - } -} - -// reset clear all entry in the queue. -func (pq *PriorityQueue) reset() { +// Reset resets the priority queue. +func (pq *PriorityQueue) Reset() { for i := 0; i < pq.Len(); i++ { - pq.ps[i] = nil + pq.ps[i].entry = nil } pq.ps = pq.ps[:0] } diff --git a/internal/client/priority_queue_test.go b/internal/client/priority_queue_test.go index e64bbf5262..e249155c9d 100644 --- a/internal/client/priority_queue_test.go +++ b/internal/client/priority_queue_test.go @@ -21,53 +21,29 @@ import ( ) type FakeItem struct { - pri uint64 - value int - canceled bool + pri int + value int } -func (f *FakeItem) priority() uint64 { +func (f *FakeItem) priority() int { return f.pri } -func (f *FakeItem) isCanceled() bool { - return f.canceled -} - func TestPriority(t *testing.T) { re := require.New(t) - testFunc := func(aq *PriorityQueue) { - for i := 1; i <= 5; i++ { - aq.Push(&FakeItem{value: i, pri: uint64(i)}) - } - re.Equal(5, aq.Len()) - re.Equal(uint64(5), aq.highestPriority()) - aq.clean() - re.Equal(5, aq.Len()) - - arr := aq.Take(1) - re.Len(arr, 1) - re.Equal(uint64(5), arr[0].priority()) - re.Equal(uint64(4), aq.highestPriority()) - - arr = aq.Take(2) - re.Len(arr, 2) - re.Equal(uint64(4), arr[0].priority()) - re.Equal(uint64(3), arr[1].priority()) - re.Equal(uint64(2), aq.highestPriority()) - - arr = aq.Take(5) - re.Len(arr, 2) - re.Equal(uint64(2), arr[0].priority()) - re.Equal(uint64(1), arr[1].priority()) - re.Equal(uint64(0), aq.highestPriority()) - re.Equal(0, aq.Len()) - - aq.Push(&FakeItem{value: 1, pri: 1, canceled: true}) - re.Equal(1, aq.Len()) - aq.clean() - re.Equal(0, aq.Len()) + pq := NewPriorityQueue() + for i := 1; i <= 5; i++ { + pq.Push(&FakeItem{value: i, pri: i}) + } + re.Equal(5, pq.Len()) + arr := pq.All() + re.Len(arr, 5) + pq.Reset() + re.Equal(0, pq.Len()) + for i := 1; i <= 5; i++ { + pq.Push(&FakeItem{value: i, pri: i}) + } + for i := pq.Len(); i > 0; i-- { + re.Equal(i, pq.Pop().(*FakeItem).value) } - hq := NewPriorityQueue() - testFunc(hq) }