From 6978558f4e9770abc137af55b1e75e792face795 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 2 Feb 2024 17:30:25 +0800 Subject: [PATCH] client: return total wait duration in resource interceptor OnRequestWait call (#7488) (#7794) ref tikv/pd#5851 Signed-off-by: ti-chi-bot Signed-off-by: glorv Co-authored-by: glorv --- .../resource_group/controller/controller.go | 15 ++++---- .../controller/controller_test.go | 2 +- .../resourcemanager/resource_manager_test.go | 34 +++++++++---------- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 4a5b5779b33..241e4ab81bf 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -66,7 +66,7 @@ func logControllerTrace(msg string, fields ...zap.Field) { // ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store. type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. - OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) + OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) // OnResponse is used to consume tokens after receiving response. OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error) // IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it. @@ -538,10 +538,10 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. func (c *ResourceGroupsController) OnRequestWait( ctx context.Context, resourceGroupName string, info RequestInfo, -) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) { +) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) { gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) if err != nil { - return nil, nil, 0, err + return nil, nil, time.Duration(0), 0, err } return gc.onRequestWait(ctx, info) } @@ -1190,7 +1190,7 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 { func (gc *groupCostController) onRequestWait( ctx context.Context, info RequestInfo, -) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) { +) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) { delta := &rmpb.Consumption{} for _, calc := range gc.calculators { calc.BeforeKVRequest(delta, info) @@ -1199,6 +1199,7 @@ func (gc *groupCostController) onRequestWait( gc.mu.Lock() add(gc.mu.consumption, delta) gc.mu.Unlock() + var waitDuration time.Duration if !gc.burstable.Load() { var err error @@ -1231,6 +1232,7 @@ func (gc *groupCostController) onRequestWait( } gc.requestRetryCounter.Inc() time.Sleep(retryInterval) + waitDuration += retryInterval } if err != nil { gc.failedRequestCounter.Inc() @@ -1243,9 +1245,10 @@ func (gc *groupCostController) onRequestWait( failpoint.Inject("triggerUpdate", func() { gc.lowRUNotifyChan <- struct{}{} }) - return nil, nil, 0, err + return nil, nil, waitDuration, 0, err } gc.successfulRequestDuration.Observe(d.Seconds()) + waitDuration += d } gc.mu.Lock() @@ -1262,7 +1265,7 @@ func (gc *groupCostController) onRequestWait( *gc.mu.storeCounter[info.StoreID()] = *gc.mu.globalCounter gc.mu.Unlock() - return delta, penalty, gc.getMeta().GetPriority(), nil + return delta, penalty, waitDuration, gc.getMeta().GetPriority(), nil } func (gc *groupCostController) onResponse( diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 1db19787a81..4d09e338222 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -101,7 +101,7 @@ func TestRequestAndResponseConsumption(t *testing.T) { kvCalculator := gc.getKVCalculator() for idx, testCase := range testCases { caseNum := fmt.Sprintf("case %d", idx) - consumption, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req) + consumption, _, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req) re.NoError(err, caseNum) re.Equal(priority, gc.meta.Priority) expectedConsumption := &rmpb.Consumption{} diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 91a21caf91b..cc227968877 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -438,9 +438,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { rres := cas.tcs[i].makeReadResponse() wres := cas.tcs[i].makeWriteResponse() startTime := time.Now() - _, _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) + _, _, _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) re.NoError(err) - _, _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) re.NoError(err) sum += time.Since(startTime) controller.OnResponse(cas.resourceGroupName, rreq, rres) @@ -457,7 +457,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)")) tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) re.Error(err) time.Sleep(time.Millisecond * 200) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) @@ -512,9 +512,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { wreq := tcs.makeWriteRequest() rres := tcs.makeReadResponse() wres := tcs.makeWriteResponse() - _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) + _, _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) re.NoError(err) - _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) re.NoError(err) controller.OnResponse(resourceGroupName, rreq, rres) controller.OnResponse(resourceGroupName, wreq, wres) @@ -551,9 +551,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { rres := cas.tcs[i].makeReadResponse() wres := cas.tcs[i].makeWriteResponse() startTime := time.Now() - _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) + _, _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) re.NoError(err) - _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) re.NoError(err) sum += time.Since(startTime) controller.OnResponse(resourceGroupName, rreq, rres) @@ -571,14 +571,14 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { resourceGroupName2 := suite.initGroups[2].Name tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 100000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq) + _, _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq) re.NoError(err) re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend", "return(true)")) resourceGroupName3 := suite.initGroups[3].Name tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 1000, times: 1, waitDuration: 0} wreq = tcs.makeWriteRequest() - _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) re.NoError(err) time.Sleep(110 * time.Millisecond) tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 10, times: 1010, waitDuration: 0} @@ -586,7 +586,7 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { for i := 0; i < tcs.times; i++ { wreq = tcs.makeWriteRequest() startTime := time.Now() - _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) duration += time.Since(startTime) re.NoError(err) } @@ -635,7 +635,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // init req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */) resp := controller.NewTestResponseInfo(0, time.Duration(30), true) - _, penalty, _, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, _, _, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) re.Equal(penalty.TotalCpuTimeMs, 0.0) @@ -644,7 +644,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) re.Equal(penalty.TotalCpuTimeMs, 0.0) @@ -654,7 +654,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // failed request, shouldn't be counted in penalty req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(0), false) - _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) re.Equal(penalty.TotalCpuTimeMs, 0.0) @@ -664,7 +664,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // from same store, should be zero req1 := controller.NewTestRequestInfo(false, 0, 1 /* store1 */) resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) + _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) _, err = c.OnResponse(resourceGroupName, req1, resp1) @@ -673,7 +673,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // from different store, should be non-zero req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */) resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) + _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) re.NoError(err) re.Equal(penalty.WriteBytes, 60.0) re.InEpsilon(penalty.TotalCpuTimeMs, 10.0/1000.0/1000.0, 1e-6) @@ -683,7 +683,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // from new store, should be zero req3 := controller.NewTestRequestInfo(true, 0, 3 /* store3 */) resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) + _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) _, err = c.OnResponse(resourceGroupName, req3, resp3) @@ -693,7 +693,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { resourceGroupName = groupNames[1] req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) + _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) _, err = c.OnResponse(resourceGroupName, req4, resp4)