diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 4a5b5779b33..241e4ab81bf 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -66,7 +66,7 @@ func logControllerTrace(msg string, fields ...zap.Field) { // ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store. type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. - OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) + OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) // OnResponse is used to consume tokens after receiving response. OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error) // IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it. @@ -538,10 +538,10 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. func (c *ResourceGroupsController) OnRequestWait( ctx context.Context, resourceGroupName string, info RequestInfo, -) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) { +) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) { gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) if err != nil { - return nil, nil, 0, err + return nil, nil, time.Duration(0), 0, err } return gc.onRequestWait(ctx, info) } @@ -1190,7 +1190,7 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 { func (gc *groupCostController) onRequestWait( ctx context.Context, info RequestInfo, -) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) { +) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) { delta := &rmpb.Consumption{} for _, calc := range gc.calculators { calc.BeforeKVRequest(delta, info) @@ -1199,6 +1199,7 @@ func (gc *groupCostController) onRequestWait( gc.mu.Lock() add(gc.mu.consumption, delta) gc.mu.Unlock() + var waitDuration time.Duration if !gc.burstable.Load() { var err error @@ -1231,6 +1232,7 @@ func (gc *groupCostController) onRequestWait( } gc.requestRetryCounter.Inc() time.Sleep(retryInterval) + waitDuration += retryInterval } if err != nil { gc.failedRequestCounter.Inc() @@ -1243,9 +1245,10 @@ func (gc *groupCostController) onRequestWait( failpoint.Inject("triggerUpdate", func() { gc.lowRUNotifyChan <- struct{}{} }) - return nil, nil, 0, err + return nil, nil, waitDuration, 0, err } gc.successfulRequestDuration.Observe(d.Seconds()) + waitDuration += d } gc.mu.Lock() @@ -1262,7 +1265,7 @@ func (gc *groupCostController) onRequestWait( *gc.mu.storeCounter[info.StoreID()] = *gc.mu.globalCounter gc.mu.Unlock() - return delta, penalty, gc.getMeta().GetPriority(), nil + return delta, penalty, waitDuration, gc.getMeta().GetPriority(), nil } func (gc *groupCostController) onResponse( diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 1db19787a81..4d09e338222 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -101,7 +101,7 @@ func TestRequestAndResponseConsumption(t *testing.T) { kvCalculator := gc.getKVCalculator() for idx, testCase := range testCases { caseNum := fmt.Sprintf("case %d", idx) - consumption, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req) + consumption, _, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req) re.NoError(err, caseNum) re.Equal(priority, gc.meta.Priority) expectedConsumption := &rmpb.Consumption{} diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index b98686222fe..076e9b99531 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -440,9 +440,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { rres := cas.tcs[i].makeReadResponse() wres := cas.tcs[i].makeWriteResponse() startTime := time.Now() - _, _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) + _, _, _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) re.NoError(err) - _, _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) re.NoError(err) sum += time.Since(startTime) controller.OnResponse(cas.resourceGroupName, rreq, rres) @@ -459,7 +459,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)")) tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) re.Error(err) time.Sleep(time.Millisecond * 200) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) @@ -514,9 +514,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { wreq := tcs.makeWriteRequest() rres := tcs.makeReadResponse() wres := tcs.makeWriteResponse() - _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) + _, _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) re.NoError(err) - _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) re.NoError(err) controller.OnResponse(resourceGroupName, rreq, rres) controller.OnResponse(resourceGroupName, wreq, wres) @@ -553,9 +553,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { rres := cas.tcs[i].makeReadResponse() wres := cas.tcs[i].makeWriteResponse() startTime := time.Now() - _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) + _, _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq) re.NoError(err) - _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq) re.NoError(err) sum += time.Since(startTime) controller.OnResponse(resourceGroupName, rreq, rres) @@ -573,14 +573,14 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { resourceGroupName2 := suite.initGroups[2].Name tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 100000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq) + _, _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq) re.NoError(err) re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend", "return(true)")) resourceGroupName3 := suite.initGroups[3].Name tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 1000, times: 1, waitDuration: 0} wreq = tcs.makeWriteRequest() - _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) re.NoError(err) time.Sleep(110 * time.Millisecond) tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 10, times: 1010, waitDuration: 0} @@ -588,7 +588,7 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { for i := 0; i < tcs.times; i++ { wreq = tcs.makeWriteRequest() startTime := time.Now() - _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq) duration += time.Since(startTime) re.NoError(err) } @@ -637,7 +637,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // init req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */) resp := controller.NewTestResponseInfo(0, time.Duration(30), true) - _, penalty, _, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, _, _, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) re.Equal(penalty.TotalCpuTimeMs, 0.0) @@ -646,7 +646,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) re.Equal(penalty.TotalCpuTimeMs, 0.0) @@ -656,7 +656,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // failed request, shouldn't be counted in penalty req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */) resp = controller.NewTestResponseInfo(0, time.Duration(0), false) - _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) re.Equal(penalty.TotalCpuTimeMs, 0.0) @@ -666,7 +666,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // from same store, should be zero req1 := controller.NewTestRequestInfo(false, 0, 1 /* store1 */) resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) + _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) _, err = c.OnResponse(resourceGroupName, req1, resp1) @@ -675,7 +675,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // from different store, should be non-zero req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */) resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) + _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) re.NoError(err) re.Equal(penalty.WriteBytes, 60.0) re.InEpsilon(penalty.TotalCpuTimeMs, 10.0/1000.0/1000.0, 1e-6) @@ -685,7 +685,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { // from new store, should be zero req3 := controller.NewTestRequestInfo(true, 0, 3 /* store3 */) resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) + _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) _, err = c.OnResponse(resourceGroupName, req3, resp3) @@ -695,7 +695,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { resourceGroupName = groupNames[1] req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true) - _, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) + _, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) re.NoError(err) re.Equal(penalty.WriteBytes, 0.0) _, err = c.OnResponse(resourceGroupName, req4, resp4)