From f578a36a651e9fbe2ecf8c678c441b8d2e02af8c Mon Sep 17 00:00:00 2001 From: Yexiang Zhang Date: Tue, 28 Nov 2023 10:31:16 +0800 Subject: [PATCH 01/14] dashboard: update version to v20231127.1 (#7458) close tikv/pd#7457 Signed-off-by: mornyx --- go.mod | 2 +- go.sum | 4 ++-- tests/integrations/client/go.mod | 2 +- tests/integrations/client/go.sum | 4 ++-- tests/integrations/mcs/go.mod | 2 +- tests/integrations/mcs/go.sum | 4 ++-- tests/integrations/tso/go.mod | 2 +- tests/integrations/tso/go.sum | 4 ++-- 8 files changed, 12 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index e4171caea8c..676d350d22d 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 - github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 + github.com/pingcap/tidb-dashboard v0.0.0-20231127105651-ce4097837c5e github.com/prometheus/client_golang v1.11.1 github.com/prometheus/common v0.26.0 github.com/sasha-s/go-deadlock v0.2.0 diff --git a/go.sum b/go.sum index 76f98d677ee..c7ceeee028c 100644 --- a/go.sum +++ b/go.sum @@ -466,8 +466,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 h1:wnHt7ETIB0vm+gbLx8QhcIEmRtrT4QlWlfpcI9vjxOk= -github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= +github.com/pingcap/tidb-dashboard v0.0.0-20231127105651-ce4097837c5e h1:SJUSDejvKtj9vSh5ptRHh4iMrvPV3oKO8yp6/SYE8vc= +github.com/pingcap/tidb-dashboard v0.0.0-20231127105651-ce4097837c5e/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index a3c85eeb2fe..799901ff2e3 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -123,7 +123,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20231127105651-ce4097837c5e // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index c16faee2e7a..e13da5d8375 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -430,8 +430,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 h1:wnHt7ETIB0vm+gbLx8QhcIEmRtrT4QlWlfpcI9vjxOk= -github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= +github.com/pingcap/tidb-dashboard v0.0.0-20231127105651-ce4097837c5e h1:SJUSDejvKtj9vSh5ptRHh4iMrvPV3oKO8yp6/SYE8vc= +github.com/pingcap/tidb-dashboard v0.0.0-20231127105651-ce4097837c5e/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 216cdad2512..75d70e3cf06 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -123,7 +123,7 @@ require ( github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20231127105651-ce4097837c5e // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index c36a3dd2d4c..dfead54afe1 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -434,8 +434,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 h1:wnHt7ETIB0vm+gbLx8QhcIEmRtrT4QlWlfpcI9vjxOk= -github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= +github.com/pingcap/tidb-dashboard v0.0.0-20231127105651-ce4097837c5e h1:SJUSDejvKtj9vSh5ptRHh4iMrvPV3oKO8yp6/SYE8vc= +github.com/pingcap/tidb-dashboard v0.0.0-20231127105651-ce4097837c5e/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index 17c432ba810..309ea9dbc4d 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -121,7 +121,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20231127105651-ce4097837c5e // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index fb87df47bcd..94fbde2ad57 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -428,8 +428,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537 h1:wnHt7ETIB0vm+gbLx8QhcIEmRtrT4QlWlfpcI9vjxOk= -github.com/pingcap/tidb-dashboard v0.0.0-20231108071238-7cb8b7ff0537/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw= +github.com/pingcap/tidb-dashboard v0.0.0-20231127105651-ce4097837c5e h1:SJUSDejvKtj9vSh5ptRHh4iMrvPV3oKO8yp6/SYE8vc= +github.com/pingcap/tidb-dashboard v0.0.0-20231127105651-ce4097837c5e/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= From c8257ed6c5ec47f000a2428721baf94621cc6c81 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 28 Nov 2023 11:37:16 +0800 Subject: [PATCH 02/14] *: make `TestUpdateDefaultReplicaConfig` stable (#7455) close tikv/pd#7410 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- tests/pdctl/config/config_test.go | 48 +++++++++++++++---------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index badccd9becc..3f82be44399 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -683,66 +683,66 @@ func (suite *configTestSuite) checkUpdateDefaultReplicaConfig(cluster *tests.Tes checkMaxReplicas := func(expect uint64) { args := []string{"-u", pdAddr, "config", "show", "replication"} - output, err := pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - replicationCfg := sc.ReplicationConfig{} - re.NoError(json.Unmarshal(output, &replicationCfg)) testutil.Eventually(re, func() bool { // wait for the config to be synced to the scheduling server + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + replicationCfg := sc.ReplicationConfig{} + re.NoError(json.Unmarshal(output, &replicationCfg)) return replicationCfg.MaxReplicas == expect }) } checkLocationLabels := func(expect int) { args := []string{"-u", pdAddr, "config", "show", "replication"} - output, err := pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - replicationCfg := sc.ReplicationConfig{} - re.NoError(json.Unmarshal(output, &replicationCfg)) testutil.Eventually(re, func() bool { // wait for the config to be synced to the scheduling server + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + replicationCfg := sc.ReplicationConfig{} + re.NoError(json.Unmarshal(output, &replicationCfg)) return len(replicationCfg.LocationLabels) == expect }) } checkIsolationLevel := func(expect string) { args := []string{"-u", pdAddr, "config", "show", "replication"} - output, err := pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - replicationCfg := sc.ReplicationConfig{} - re.NoError(json.Unmarshal(output, &replicationCfg)) testutil.Eventually(re, func() bool { // wait for the config to be synced to the scheduling server + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + replicationCfg := sc.ReplicationConfig{} + re.NoError(json.Unmarshal(output, &replicationCfg)) return replicationCfg.IsolationLevel == expect }) } checkRuleCount := func(expect int) { args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", placement.DefaultGroupID, "--id", placement.DefaultRuleID} - output, err := pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - rule := placement.Rule{} - re.NoError(json.Unmarshal(output, &rule)) testutil.Eventually(re, func() bool { // wait for the config to be synced to the scheduling server + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + rule := placement.Rule{} + re.NoError(json.Unmarshal(output, &rule)) return rule.Count == expect }) } checkRuleLocationLabels := func(expect int) { args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", placement.DefaultGroupID, "--id", placement.DefaultRuleID} - output, err := pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - rule := placement.Rule{} - re.NoError(json.Unmarshal(output, &rule)) testutil.Eventually(re, func() bool { // wait for the config to be synced to the scheduling server + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + rule := placement.Rule{} + re.NoError(json.Unmarshal(output, &rule)) return len(rule.LocationLabels) == expect }) } checkRuleIsolationLevel := func(expect string) { args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", placement.DefaultGroupID, "--id", placement.DefaultRuleID} - output, err := pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - rule := placement.Rule{} - re.NoError(json.Unmarshal(output, &rule)) testutil.Eventually(re, func() bool { // wait for the config to be synced to the scheduling server + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + rule := placement.Rule{} + re.NoError(json.Unmarshal(output, &rule)) return rule.IsolationLevel == expect }) } From a6e855eef6744adfac232833769219be4f806756 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Tue, 28 Nov 2023 14:40:16 +0800 Subject: [PATCH 03/14] *: remove improper method in ServiceDiscovery (#7456) ref tikv/pd#4399 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/pd_service_discovery.go | 4 +--- client/tso_service_discovery.go | 17 +---------------- pkg/utils/grpcutil/grpcutil.go | 1 + server/config/persist_options.go | 1 + 4 files changed, 4 insertions(+), 19 deletions(-) diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index e96093f598d..98ddd611326 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -63,8 +63,6 @@ type ServiceDiscovery interface { GetKeyspaceID() uint32 // GetKeyspaceGroupID returns the ID of the keyspace group GetKeyspaceGroupID() uint32 - // DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls. - DiscoverMicroservice(svcType serviceType) ([]string, error) // GetServiceURLs returns the URLs of the servers providing the service GetServiceURLs() []string // GetServingEndpointClientConn returns the grpc client connection of the serving endpoint @@ -324,7 +322,7 @@ func (c *pdServiceDiscovery) GetKeyspaceGroupID() uint32 { } // DiscoverMicroservice discovers the microservice with the specified type and returns the server urls. -func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []string, err error) { +func (c *pdServiceDiscovery) discoverMicroservice(svcType serviceType) (urls []string, err error) { switch svcType { case apiService: urls = c.GetServiceURLs() diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 92f95129951..5f14c406797 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -288,21 +288,6 @@ func (c *tsoServiceDiscovery) GetKeyspaceGroupID() uint32 { return c.keyspaceGroupSD.group.Id } -// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls. -func (c *tsoServiceDiscovery) DiscoverMicroservice(svcType serviceType) ([]string, error) { - var urls []string - - switch svcType { - case apiService: - case tsoService: - return c.apiSvcDiscovery.DiscoverMicroservice(tsoService) - default: - panic("invalid service type") - } - - return urls, nil -} - // GetServiceURLs returns the URLs of the tso primary/secondary addresses of this keyspace group. // For testing use. It should only be called when the client is closed. func (c *tsoServiceDiscovery) GetServiceURLs() []string { @@ -582,7 +567,7 @@ func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) ) t := c.tsoServerDiscovery if len(t.addrs) == 0 || t.failureCount == len(t.addrs) { - addrs, err = sd.DiscoverMicroservice(tsoService) + addrs, err = sd.(*pdServiceDiscovery).discoverMicroservice(tsoService) if err != nil { return "", err } diff --git a/pkg/utils/grpcutil/grpcutil.go b/pkg/utils/grpcutil/grpcutil.go index 44d45ff4c70..a001ec4bd03 100644 --- a/pkg/utils/grpcutil/grpcutil.go +++ b/pkg/utils/grpcutil/grpcutil.go @@ -163,6 +163,7 @@ func GetForwardedHost(ctx context.Context) string { md, ok := metadata.FromIncomingContext(ctx) if !ok { log.Debug("failed to get forwarding metadata") + return "" } if t, ok := md[ForwardMetadataKey]; ok { return t[0] diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 42ab91fce17..ae9047c626b 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -1024,6 +1024,7 @@ func (o *PersistOptions) IsRaftKV2() bool { } // SetRegionBucketEnabled sets if the region bucket is enabled. +// only used for test. func (o *PersistOptions) SetRegionBucketEnabled(enabled bool) { cfg := o.GetStoreConfig().Clone() cfg.SetRegionBucketEnabled(enabled) From 8c8b4d4c78701bfda0dc247caa945684fc3b08f9 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 29 Nov 2023 09:17:46 +0800 Subject: [PATCH 04/14] *: make TestConfig stable (#7463) close tikv/pd#7440 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/dashboard/adapter/manager.go | 5 +++++ tests/pdctl/config/config_test.go | 3 +++ 2 files changed, 8 insertions(+) diff --git a/pkg/dashboard/adapter/manager.go b/pkg/dashboard/adapter/manager.go index a3691242c8f..293d8ad6549 100644 --- a/pkg/dashboard/adapter/manager.go +++ b/pkg/dashboard/adapter/manager.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/pingcap/tidb-dashboard/pkg/apiserver" @@ -75,6 +76,10 @@ func (m *Manager) Stop() { func (m *Manager) serviceLoop() { defer logutil.LogPanic() defer m.wg.Done() + // TODO: After we fix the atomic problem of config, we can remove this failpoint. + failpoint.Inject("skipDashboardLoop", func() { + failpoint.Return() + }) ticker := time.NewTicker(CheckInterval) defer ticker.Stop() diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 3f82be44399..3b3310185b1 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/coreos/go-semver/semver" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -60,8 +61,10 @@ func TestConfigTestSuite(t *testing.T) { } func (suite *configTestSuite) TestConfig() { + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/dashboard/adapter/skipDashboardLoop", `return(true)`)) env := tests.NewSchedulingTestEnvironment(suite.T()) env.RunTestInTwoModes(suite.checkConfig) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/dashboard/adapter/skipDashboardLoop")) } func (suite *configTestSuite) checkConfig(cluster *tests.TestCluster) { From 4356aebb55f453cf91116a22461f4ab375c9ce11 Mon Sep 17 00:00:00 2001 From: David <8039876+AmoebaProtozoa@users.noreply.github.com> Date: Wed, 29 Nov 2023 11:43:17 +0800 Subject: [PATCH 05/14] pd-ctl: retry when keyspace group manager not initialized (#7442) close tikv/pd#7441 --- server/apiv2/handlers/keyspace.go | 2 +- server/apiv2/handlers/tso_keyspace_group.go | 25 ++++++++-------- tests/pdctl/keyspace/keyspace_test.go | 29 +++++++++++++++++++ .../pd-ctl/pdctl/command/keyspace_command.go | 28 ++++++++++++++---- 4 files changed, 65 insertions(+), 19 deletions(-) diff --git a/server/apiv2/handlers/keyspace.go b/server/apiv2/handlers/keyspace.go index 9602cc863ef..c2802bb939d 100644 --- a/server/apiv2/handlers/keyspace.go +++ b/server/apiv2/handlers/keyspace.go @@ -113,7 +113,7 @@ func LoadKeyspace(c *gin.Context) { if value, ok := c.GetQuery("force_refresh_group_id"); ok && value == "true" { groupManager := svr.GetKeyspaceGroupManager() if groupManager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, managerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } // keyspace has been checked in LoadKeyspace, so no need to check again. diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index a580b21f705..a9f042687f6 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -30,7 +30,8 @@ import ( "github.com/tikv/pd/server/apiv2/middlewares" ) -const groupManagerUninitializedErr = "keyspace group manager is not initialized" +// GroupManagerUninitializedErr is the error message for uninitialized keyspace group manager. +const GroupManagerUninitializedErr = "keyspace group manager is not initialized" // RegisterTSOKeyspaceGroup registers keyspace group handlers to the server. func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) { @@ -78,7 +79,7 @@ func CreateKeyspaceGroups(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } err = manager.CreateKeyspaceGroups(createParams.KeyspaceGroups) @@ -101,7 +102,7 @@ func GetKeyspaceGroups(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } keyspaceGroups, err := manager.GetKeyspaceGroups(scanStart, scanLimit) @@ -152,7 +153,7 @@ func GetKeyspaceGroupByID(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } @@ -189,7 +190,7 @@ func DeleteKeyspaceGroupByID(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } kg, err := manager.DeleteKeyspaceGroupByID(id) @@ -250,7 +251,7 @@ func SplitKeyspaceGroupByID(c *gin.Context) { } groupManager := svr.GetKeyspaceGroupManager() if groupManager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } @@ -289,7 +290,7 @@ func FinishSplitKeyspaceByID(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } err = manager.FinishSplitKeyspaceByID(id) @@ -337,7 +338,7 @@ func MergeKeyspaceGroups(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) groupManager := svr.GetKeyspaceGroupManager() if groupManager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } // Merge keyspace group. @@ -364,7 +365,7 @@ func FinishMergeKeyspaceByID(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } err = manager.FinishMergeKeyspaceByID(id) @@ -390,7 +391,7 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } allocParams := &AllocNodesForKeyspaceGroupParams{} @@ -437,7 +438,7 @@ func SetNodesForKeyspaceGroup(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } setParams := &SetNodesForKeyspaceGroupParams{} @@ -493,7 +494,7 @@ func SetPriorityForKeyspaceGroup(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() if manager == nil { - c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + c.AbortWithStatusJSON(http.StatusInternalServerError, GroupManagerUninitializedErr) return } setParams := &SetPriorityForKeyspaceGroupParams{} diff --git a/tests/pdctl/keyspace/keyspace_test.go b/tests/pdctl/keyspace/keyspace_test.go index 805a30e6f18..3ff755fe601 100644 --- a/tests/pdctl/keyspace/keyspace_test.go +++ b/tests/pdctl/keyspace/keyspace_test.go @@ -105,6 +105,35 @@ func TestKeyspace(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) } +// Show command should auto retry without refresh_group_id if keyspace group manager not initialized. +// See issue: #7441 +func TestKeyspaceGroupUninitialized(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) + tc, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + re.NoError(tc.RunInitialServers()) + tc.WaitLeader() + re.NoError(tc.GetLeaderServer().BootstrapCluster()) + pdAddr := tc.GetConfig().GetClientURL() + + keyspaceName := "DEFAULT" + keyspaceID := uint32(0) + args := []string{"-u", pdAddr, "keyspace", "show", "name", keyspaceName} + output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) + re.NoError(err) + var meta api.KeyspaceMeta + re.NoError(json.Unmarshal(output, &meta)) + re.Equal(keyspaceName, meta.GetName()) + re.Equal(keyspaceID, meta.GetId()) + + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion")) +} + type keyspaceTestSuite struct { suite.Suite ctx context.Context diff --git a/tools/pd-ctl/pdctl/command/keyspace_command.go b/tools/pd-ctl/pdctl/command/keyspace_command.go index 7c0d3d78bf6..93a99abc39f 100644 --- a/tools/pd-ctl/pdctl/command/keyspace_command.go +++ b/tools/pd-ctl/pdctl/command/keyspace_command.go @@ -28,11 +28,12 @@ import ( const ( keyspacePrefix = "pd/api/v2/keyspaces" // flags - nmConfig = "config" - nmLimit = "limit" - nmPageToken = "page_token" - nmRemove = "remove" - nmUpdate = "update" + nmConfig = "config" + nmLimit = "limit" + nmPageToken = "page_token" + nmRemove = "remove" + nmUpdate = "update" + nmForceRefreshGroupID = "force_refresh_group_id" ) // NewKeyspaceCommand returns a keyspace subcommand of rootCmd. @@ -64,6 +65,7 @@ func newShowKeyspaceCommand() *cobra.Command { Short: "show keyspace metadata specified by keyspace name", Run: showKeyspaceNameCommandFunc, } + showByName.Flags().Bool(nmForceRefreshGroupID, true, "force refresh keyspace group id") r.AddCommand(showByID) r.AddCommand(showByName) return r @@ -87,7 +89,21 @@ func showKeyspaceNameCommandFunc(cmd *cobra.Command, args []string) { cmd.Usage() return } - resp, err := doRequest(cmd, fmt.Sprintf("%s/%s?force_refresh_group_id=true", keyspacePrefix, args[0]), http.MethodGet, http.Header{}) + refreshGroupID, err := cmd.Flags().GetBool(nmForceRefreshGroupID) + if err != nil { + cmd.PrintErrln("Failed to parse flag: ", err) + return + } + url := fmt.Sprintf("%s/%s", keyspacePrefix, args[0]) + if refreshGroupID { + url += "?force_refresh_group_id=true" + } + resp, err := doRequest(cmd, url, http.MethodGet, http.Header{}) + // Retry without the force_refresh_group_id if the keyspace group manager is not initialized. + // This can happen when PD is not running in API mode. + if err != nil && refreshGroupID && strings.Contains(err.Error(), handlers.GroupManagerUninitializedErr) { + resp, err = doRequest(cmd, fmt.Sprintf("%s/%s", keyspacePrefix, args[0]), http.MethodGet, http.Header{}) + } if err != nil { cmd.PrintErrln("Failed to get the keyspace information: ", err) return From 7fbe69d1ec7ffd3f4d4f4b1b8cd2f78f9f0d689a Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 29 Nov 2023 13:17:47 +0800 Subject: [PATCH 06/14] client/http: add GetHistoryHotRegions interface and more tests (#7453) ref tikv/pd#7300 - Add `GetHistoryHotRegions` interface. - Add more tests. Signed-off-by: JmPotato --- client/http/client.go | 61 +++++++++++++- client/http/types.go | 42 ++++++++++ tests/integrations/client/http_client_test.go | 79 +++++++++++++++++-- 3 files changed, 173 insertions(+), 9 deletions(-) diff --git a/client/http/client.go b/client/http/client.go index cd7be205c12..d15693e11d4 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -49,8 +49,12 @@ type Client interface { GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error) GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error) GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error) + GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error) GetRegionStatusByKeyRange(context.Context, *KeyRange) (*RegionStats, error) GetStores(context.Context) (*StoresInfo, error) + /* Config-related interfaces */ + GetScheduleConfig(context.Context) (map[string]interface{}, error) + SetScheduleConfig(context.Context, map[string]interface{}) error /* Rule-related interfaces */ GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error) GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error) @@ -191,12 +195,23 @@ func (c *client) execDuration(name string, duration time.Duration) { c.executionDuration.WithLabelValues(name).Observe(duration.Seconds()) } +// HeaderOption configures the HTTP header. +type HeaderOption func(header http.Header) + +// WithAllowFollowerHandle sets the header field to allow a PD follower to handle this request. +func WithAllowFollowerHandle() HeaderOption { + return func(header http.Header) { + header.Set("PD-Allow-Follower-Handle", "true") + } +} + // At present, we will use the retry strategy of polling by default to keep // it consistent with the current implementation of some clients (e.g. TiDB). func (c *client) requestWithRetry( ctx context.Context, name, uri, method string, body io.Reader, res interface{}, + headerOpts ...HeaderOption, ) error { var ( err error @@ -204,7 +219,7 @@ func (c *client) requestWithRetry( ) for idx := 0; idx < len(c.pdAddrs); idx++ { addr = c.pdAddrs[idx] - err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res) + err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res, headerOpts...) if err == nil { break } @@ -218,6 +233,7 @@ func (c *client) request( ctx context.Context, name, url, method string, body io.Reader, res interface{}, + headerOpts ...HeaderOption, ) error { logFields := []zap.Field{ zap.String("name", name), @@ -229,6 +245,9 @@ func (c *client) request( log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...) return errors.Trace(err) } + for _, opt := range headerOpts { + opt(req.Header) + } start := time.Now() resp, err := c.cli.Do(req) if err != nil { @@ -361,6 +380,23 @@ func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, e return &hotWriteRegions, nil } +// GetHistoryHotRegions gets the history hot region statistics info. +func (c *client) GetHistoryHotRegions(ctx context.Context, req *HistoryHotRegionsRequest) (*HistoryHotRegions, error) { + reqJSON, err := json.Marshal(req) + if err != nil { + return nil, errors.Trace(err) + } + var historyHotRegions HistoryHotRegions + err = c.requestWithRetry(ctx, + "GetHistoryHotRegions", HotHistory, + http.MethodGet, bytes.NewBuffer(reqJSON), &historyHotRegions, + WithAllowFollowerHandle()) + if err != nil { + return nil, err + } + return &historyHotRegions, nil +} + // GetRegionStatusByKeyRange gets the region status by key range. // The keys in the key range should be encoded in the UTF-8 bytes format. func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange) (*RegionStats, error) { @@ -375,6 +411,29 @@ func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRan return ®ionStats, nil } +// GetScheduleConfig gets the schedule configurations. +func (c *client) GetScheduleConfig(ctx context.Context) (map[string]interface{}, error) { + var config map[string]interface{} + err := c.requestWithRetry(ctx, + "GetScheduleConfig", ScheduleConfig, + http.MethodGet, http.NoBody, &config) + if err != nil { + return nil, err + } + return config, nil +} + +// SetScheduleConfig sets the schedule configurations. +func (c *client) SetScheduleConfig(ctx context.Context, config map[string]interface{}) error { + configJSON, err := json.Marshal(config) + if err != nil { + return errors.Trace(err) + } + return c.requestWithRetry(ctx, + "SetScheduleConfig", ScheduleConfig, + http.MethodPost, bytes.NewBuffer(configJSON), nil) +} + // GetStores gets the stores info. func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) { var stores StoresInfo diff --git a/client/http/types.go b/client/http/types.go index df448e7e20d..4e99d911e0b 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -19,6 +19,8 @@ import ( "encoding/json" "net/url" "time" + + "github.com/pingcap/kvproto/pkg/encryptionpb" ) // KeyRange defines a range of keys in bytes. @@ -166,6 +168,46 @@ type HotPeerStatShow struct { LastUpdateTime time.Time `json:"last_update_time,omitempty"` } +// HistoryHotRegionsRequest wrap the request conditions. +type HistoryHotRegionsRequest struct { + StartTime int64 `json:"start_time,omitempty"` + EndTime int64 `json:"end_time,omitempty"` + RegionIDs []uint64 `json:"region_ids,omitempty"` + StoreIDs []uint64 `json:"store_ids,omitempty"` + PeerIDs []uint64 `json:"peer_ids,omitempty"` + IsLearners []bool `json:"is_learners,omitempty"` + IsLeaders []bool `json:"is_leaders,omitempty"` + HotRegionTypes []string `json:"hot_region_type,omitempty"` +} + +// HistoryHotRegions wraps historyHotRegion +type HistoryHotRegions struct { + HistoryHotRegion []*HistoryHotRegion `json:"history_hot_region"` +} + +// HistoryHotRegion wraps hot region info +// it is storage format of hot_region_storage +type HistoryHotRegion struct { + UpdateTime int64 `json:"update_time"` + RegionID uint64 `json:"region_id"` + PeerID uint64 `json:"peer_id"` + StoreID uint64 `json:"store_id"` + IsLeader bool `json:"is_leader"` + IsLearner bool `json:"is_learner"` + HotRegionType string `json:"hot_region_type"` + HotDegree int64 `json:"hot_degree"` + FlowBytes float64 `json:"flow_bytes"` + KeyRate float64 `json:"key_rate"` + QueryRate float64 `json:"query_rate"` + StartKey string `json:"start_key"` + EndKey string `json:"end_key"` + // Encryption metadata for start_key and end_key. encryption_meta.iv is IV for start_key. + // IV for end_key is calculated from (encryption_meta.iv + len(start_key)). + // The field is only used by PD and should be ignored otherwise. + // If encryption_meta is empty (i.e. nil), it means start_key and end_key are unencrypted. + EncryptionMeta *encryptionpb.EncryptionMeta `json:"encryption_meta,omitempty"` +} + // StoresInfo represents the information of all TiKV/TiFlash stores. type StoresInfo struct { Count int `json:"count"` diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 3a52e91e1f8..a007b893187 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -55,8 +55,16 @@ func (suite *httpClientTestSuite) SetupSuite() { re.NoError(err) leader := suite.cluster.WaitLeader() re.NotEmpty(leader) - err = suite.cluster.GetLeaderServer().BootstrapCluster() + leaderServer := suite.cluster.GetLeaderServer() + err = leaderServer.BootstrapCluster() re.NoError(err) + for _, region := range []*core.RegionInfo{ + core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")), + core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3")), + } { + err := leaderServer.GetRaftCluster().HandleRegionHeartbeat(region) + re.NoError(err) + } var ( testServers = suite.cluster.GetServers() endpoints = make([]string, 0, len(testServers)) @@ -73,6 +81,53 @@ func (suite *httpClientTestSuite) TearDownSuite() { suite.cluster.Destroy() } +func (suite *httpClientTestSuite) TestMeta() { + re := suite.Require() + region, err := suite.client.GetRegionByID(suite.ctx, 10) + re.NoError(err) + re.Equal(int64(10), region.ID) + re.Equal(core.HexRegionKeyStr([]byte("a1")), region.StartKey) + re.Equal(core.HexRegionKeyStr([]byte("a2")), region.EndKey) + region, err = suite.client.GetRegionByKey(suite.ctx, []byte("a2")) + re.NoError(err) + re.Equal(int64(11), region.ID) + re.Equal(core.HexRegionKeyStr([]byte("a2")), region.StartKey) + re.Equal(core.HexRegionKeyStr([]byte("a3")), region.EndKey) + regions, err := suite.client.GetRegions(suite.ctx) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + regions, err = suite.client.GetRegionsByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), -1) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + regions, err = suite.client.GetRegionsByStoreID(suite.ctx, 1) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + regionStats, err := suite.client.GetRegionStatusByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3"))) + re.NoError(err) + re.Equal(2, regionStats.Count) + hotReadRegions, err := suite.client.GetHotReadRegions(suite.ctx) + re.NoError(err) + re.Len(hotReadRegions.AsPeer, 1) + re.Len(hotReadRegions.AsLeader, 1) + hotWriteRegions, err := suite.client.GetHotWriteRegions(suite.ctx) + re.NoError(err) + re.Len(hotWriteRegions.AsPeer, 1) + re.Len(hotWriteRegions.AsLeader, 1) + historyHorRegions, err := suite.client.GetHistoryHotRegions(suite.ctx, &pd.HistoryHotRegionsRequest{ + StartTime: 0, + EndTime: time.Now().AddDate(0, 0, 1).UnixNano() / int64(time.Millisecond), + }) + re.NoError(err) + re.Len(historyHorRegions.HistoryHotRegion, 0) + store, err := suite.client.GetStores(suite.ctx) + re.NoError(err) + re.Equal(1, store.Count) + re.Len(store.Stores, 1) +} + func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() { re := suite.Require() testMinResolvedTS := tsoutil.TimeToTS(time.Now()) @@ -271,13 +326,6 @@ func (suite *httpClientTestSuite) TestRegionLabel() { func (suite *httpClientTestSuite) TestAccelerateSchedule() { re := suite.Require() raftCluster := suite.cluster.GetLeaderServer().GetRaftCluster() - for _, region := range []*core.RegionInfo{ - core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")), - core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3")), - } { - err := raftCluster.HandleRegionHeartbeat(region) - re.NoError(err) - } suspectRegions := raftCluster.GetSuspectRegions() re.Len(suspectRegions, 0) err := suite.client.AccelerateSchedule(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a2"))) @@ -295,3 +343,18 @@ func (suite *httpClientTestSuite) TestAccelerateSchedule() { suspectRegions = raftCluster.GetSuspectRegions() re.Len(suspectRegions, 2) } + +func (suite *httpClientTestSuite) TestScheduleConfig() { + re := suite.Require() + config, err := suite.client.GetScheduleConfig(suite.ctx) + re.NoError(err) + re.Equal(float64(4), config["leader-schedule-limit"]) + re.Equal(float64(2048), config["region-schedule-limit"]) + config["leader-schedule-limit"] = float64(8) + err = suite.client.SetScheduleConfig(suite.ctx, config) + re.NoError(err) + config, err = suite.client.GetScheduleConfig(suite.ctx) + re.NoError(err) + re.Equal(float64(8), config["leader-schedule-limit"]) + re.Equal(float64(2048), config["region-schedule-limit"]) +} From 90245e376b156a6aa2c41c6063f093bd63561dc4 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 29 Nov 2023 14:55:20 +0800 Subject: [PATCH 07/14] mcs: fix unstable panic in tso test (#7433) close tikv/pd#7429 Signed-off-by: lhy1024 --- pkg/tso/keyspace_group_manager_test.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index c20abfc5f79..0c1b017d7aa 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -1224,14 +1224,12 @@ func waitForPrimariesServing( re *require.Assertions, mgrs []*KeyspaceGroupManager, ids []uint32, ) { testutil.Eventually(re, func() bool { - for i := 0; i < 100; i++ { - for j, id := range ids { - if member, err := mgrs[j].GetElectionMember(id, id); err != nil || !member.IsLeader() { - return false - } - if _, _, err := mgrs[j].HandleTSORequest(mgrs[j].ctx, id, id, GlobalDCLocation, 1); err != nil { - return false - } + for j, id := range ids { + if member, err := mgrs[j].GetElectionMember(id, id); err != nil || member == nil || !member.IsLeader() { + return false + } + if _, _, err := mgrs[j].HandleTSORequest(mgrs[j].ctx, id, id, GlobalDCLocation, 1); err != nil { + return false } } return true From 1934450e9aa04c7aaf291110bf23a5995f9c94fd Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Wed, 29 Nov 2023 15:38:48 +0800 Subject: [PATCH 08/14] client: add follower option (#7465) ref tikv/pd#7431 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 22 +++++++++++++++++----- client/option.go | 17 +++++++++++++++++ client/option_test.go | 8 ++++++++ 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/client/client.go b/client/client.go index 2d30d9fb6c4..70de1322488 100644 --- a/client/client.go +++ b/client/client.go @@ -91,7 +91,7 @@ type Client interface { // client should retry later. GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) // GetRegionFromMember gets a region from certain members. - GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*Region, error) + GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) // GetPrevRegion gets the previous region and its leader Peer of the region where the key is located. GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) // GetRegionByID gets a region and its leader Peer from PD by id. @@ -100,7 +100,7 @@ type Client interface { // Limit limits the maximum number of regions returned. // If a region has no leader, corresponding leader will be placed by a peer // with empty value (PeerID is 0). - ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*Region, error) + ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) // GetStore gets a store from PD by store id. // The store may expire later. Caller is responsible for caching and taking care // of store change. @@ -200,7 +200,8 @@ func WithSkipStoreLimit() RegionsOption { // GetRegionOp represents available options when getting regions. type GetRegionOp struct { - needBuckets bool + needBuckets bool + allowFollowerHandle bool } // GetRegionOption configures GetRegionOp. @@ -211,6 +212,11 @@ func WithBuckets() GetRegionOption { return func(op *GetRegionOp) { op.needBuckets = true } } +// WithAllowFollowerHandle means that client can send request to follower and let it handle this request. +func WithAllowFollowerHandle() GetRegionOption { + return func(op *GetRegionOp) { op.allowFollowerHandle = true } +} + // LeaderHealthCheckInterval might be changed in the unit to shorten the testing time. var LeaderHealthCheckInterval = time.Second @@ -701,6 +707,12 @@ func (c *client) UpdateOption(option DynamicOption, value interface{}) error { return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool") } c.option.setEnableTSOFollowerProxy(enable) + case EnableFollowerHandle: + enable, ok := value.(bool) + if !ok { + return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool") + } + c.option.setEnableFollowerHandle(enable) default: return errors.New("[pd] unsupported client option") } @@ -952,7 +964,7 @@ func isNetworkError(code codes.Code) bool { return code == codes.Unavailable || code == codes.DeadlineExceeded } -func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*Region, error) { +func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil { span = opentracing.StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -1056,7 +1068,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get return handleRegionResponse(resp), nil } -func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*Region, error) { +func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil { span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context())) defer span.Finish() diff --git a/client/option.go b/client/option.go index d6a6d61d2f9..2a6c285cfb7 100644 --- a/client/option.go +++ b/client/option.go @@ -28,6 +28,7 @@ const ( maxInitClusterRetries = 100 defaultMaxTSOBatchWaitInterval time.Duration = 0 defaultEnableTSOFollowerProxy = false + defaultEnableFollowerHandle = false ) // DynamicOption is used to distinguish the dynamic option type. @@ -40,6 +41,8 @@ const ( // EnableTSOFollowerProxy is the TSO Follower Proxy option. // It is stored as bool. EnableTSOFollowerProxy + // EnableFollowerHandle is the follower handle option. + EnableFollowerHandle dynamicOptionCount ) @@ -72,6 +75,7 @@ func newOption() *option { co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval) co.dynamicOptions[EnableTSOFollowerProxy].Store(defaultEnableTSOFollowerProxy) + co.dynamicOptions[EnableFollowerHandle].Store(defaultEnableFollowerHandle) return co } @@ -88,6 +92,19 @@ func (o *option) setMaxTSOBatchWaitInterval(interval time.Duration) error { return nil } +// setEnableFollowerHandle set the Follower Handle option. +func (o *option) setEnableFollowerHandle(enable bool) { + old := o.getEnableFollowerHandle() + if enable != old { + o.dynamicOptions[EnableFollowerHandle].Store(enable) + } +} + +// getMaxTSOBatchWaitInterval gets the Follower Handle enable option. +func (o *option) getEnableFollowerHandle() bool { + return o.dynamicOptions[EnableFollowerHandle].Load().(bool) +} + // getMaxTSOBatchWaitInterval gets the max TSO batch wait interval option. func (o *option) getMaxTSOBatchWaitInterval() time.Duration { return o.dynamicOptions[MaxTSOBatchWaitInterval].Load().(time.Duration) diff --git a/client/option_test.go b/client/option_test.go index 1b5604f4d19..1a8faf8fcd9 100644 --- a/client/option_test.go +++ b/client/option_test.go @@ -28,6 +28,7 @@ func TestDynamicOptionChange(t *testing.T) { // Check the default value setting. re.Equal(defaultMaxTSOBatchWaitInterval, o.getMaxTSOBatchWaitInterval()) re.Equal(defaultEnableTSOFollowerProxy, o.getEnableTSOFollowerProxy()) + re.Equal(defaultEnableFollowerHandle, o.getEnableFollowerHandle()) // Check the invalid value setting. re.NotNil(o.setMaxTSOBatchWaitInterval(time.Second)) @@ -55,4 +56,11 @@ func TestDynamicOptionChange(t *testing.T) { close(o.enableTSOFollowerProxyCh) // Setting the same value should not notify the channel. o.setEnableTSOFollowerProxy(expectBool) + + expectBool = true + o.setEnableFollowerHandle(expectBool) + re.Equal(expectBool, o.getEnableFollowerHandle()) + expectBool = false + o.setEnableFollowerHandle(expectBool) + re.Equal(expectBool, o.getEnableFollowerHandle()) } From 54bf70e45e67fe4fb576409740270fe49af98ad6 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Wed, 29 Nov 2023 17:52:19 +0800 Subject: [PATCH 09/14] client: update the leader even if the connection creation fails (#7443) close tikv/pd#7416 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 4 +- client/grpcutil/grpcutil.go | 8 ++ client/pd_service_discovery.go | 1 - client/tso_client.go | 3 +- client/tso_dispatcher.go | 82 +++++++++--------- tests/integrations/client/client_test.go | 101 ++++++++++++++++++++++- 6 files changed, 156 insertions(+), 43 deletions(-) diff --git a/client/client.go b/client/client.go index 70de1322488..f34f5897013 100644 --- a/client/client.go +++ b/client/client.go @@ -744,16 +744,18 @@ func (c *client) checkLeaderHealth(ctx context.Context) { if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil { healthCli := healthpb.NewHealthClient(client) resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) - rpcErr, ok := status.FromError(err) failpoint.Inject("unreachableNetwork1", func() { resp = nil err = status.New(codes.Unavailable, "unavailable").Err() }) + rpcErr, ok := status.FromError(err) if (ok && isNetworkError(rpcErr.Code())) || resp.GetStatus() != healthpb.HealthCheckResponse_SERVING { atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1)) } else { atomic.StoreInt32(&(c.leaderNetworkFailure), int32(0)) } + } else { + atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1)) } } diff --git a/client/grpcutil/grpcutil.go b/client/grpcutil/grpcutil.go index 125f1125721..fe149e76ecc 100644 --- a/client/grpcutil/grpcutil.go +++ b/client/grpcutil/grpcutil.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/tlsutil" @@ -88,6 +90,12 @@ func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string dCtx, cancel := context.WithTimeout(ctx, dialTimeout) defer cancel() cc, err := GetClientConn(dCtx, addr, tlsConfig, opt...) + failpoint.Inject("unreachableNetwork2", func(val failpoint.Value) { + if val, ok := val.(string); ok && val == addr { + cc = nil + err = errors.Errorf("unreachable network") + } + }) if err != nil { return nil, err } diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 98ddd611326..b75276adbe9 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -614,7 +614,6 @@ func (c *pdServiceDiscovery) switchLeader(addrs []string) error { if _, err := c.GetOrCreateGRPCConn(addr); err != nil { log.Warn("[pd] failed to connect leader", zap.String("leader", addr), errs.ZapError(err)) - return err } // Set PD leader and Global TSO Allocator (which is also the PD leader) c.leader.Store(addr) diff --git a/client/tso_client.go b/client/tso_client.go index 35d9388c72b..fc38ee8e5ba 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -171,9 +171,10 @@ func (c *tsoClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*g if !ok { panic(fmt.Sprintf("the allocator leader in %s should exist", dcLocation)) } + // todo: if we support local tso forward, we should get or create client conns. cc, ok := c.svcDiscovery.GetClientConns().Load(url) if !ok { - panic(fmt.Sprintf("the client connection of %s in %s should exist", url, dcLocation)) + return nil, url.(string) } return cc.(*grpc.ClientConn), url.(string) } diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index e4c5bf3c77a..0de4dc3a49e 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -254,7 +254,7 @@ func (c *tsoClient) checkAllocator( requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(0) }() cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc) - healthCli := healthpb.NewHealthClient(cc) + var healthCli healthpb.HealthClient ticker := time.NewTicker(time.Second) defer ticker.Stop() for { @@ -263,20 +263,25 @@ func (c *tsoClient) checkAllocator( log.Info("[tso] the leader of the allocator leader is changed", zap.String("dc", dc), zap.String("origin", url), zap.String("new", u)) return } - healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.option.timeout) - resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) - failpoint.Inject("unreachableNetwork", func() { - resp.Status = healthpb.HealthCheckResponse_UNKNOWN - }) - healthCancel() - if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { - // create a stream of the original allocator - cctx, cancel := context.WithCancel(dispatcherCtx) - stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) - if err == nil && stream != nil { - log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) - updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) - return + if healthCli == nil && cc != nil { + healthCli = healthpb.NewHealthClient(cc) + } + if healthCli != nil { + healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.option.timeout) + resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) + failpoint.Inject("unreachableNetwork", func() { + resp.Status = healthpb.HealthCheckResponse_UNKNOWN + }) + healthCancel() + if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { + // create a stream of the original allocator + cctx, cancel := context.WithCancel(dispatcherCtx) + stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + if err == nil && stream != nil { + log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) + updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) + return + } } } select { @@ -285,7 +290,7 @@ func (c *tsoClient) checkAllocator( case <-ticker.C: // To ensure we can get the latest allocator leader // and once the leader is changed, we can exit this function. - _, u = c.GetTSOAllocatorClientConnByDCLocation(dc) + cc, u = c.GetTSOAllocatorClientConnByDCLocation(dc) } } } @@ -597,29 +602,32 @@ func (c *tsoClient) tryConnectToTSO( for i := 0; i < maxRetryTimes; i++ { c.svcDiscovery.ScheduleCheckMemberChanged() cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) - cctx, cancel := context.WithCancel(dispatcherCtx) - stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) - failpoint.Inject("unreachableNetwork", func() { - stream = nil - err = status.New(codes.Unavailable, "unavailable").Err() - }) - if stream != nil && err == nil { - updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) - return nil - } - - if err != nil && c.option.enableForwarding { - // The reason we need to judge if the error code is equal to "Canceled" here is that - // when we create a stream we use a goroutine to manually control the timeout of the connection. - // There is no need to wait for the transport layer timeout which can reduce the time of unavailability. - // But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error. - // And actually the `Canceled` error can be regarded as a kind of network error in some way. - if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) { - networkErrNum++ + if cc != nil { + cctx, cancel := context.WithCancel(dispatcherCtx) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + failpoint.Inject("unreachableNetwork", func() { + stream = nil + err = status.New(codes.Unavailable, "unavailable").Err() + }) + if stream != nil && err == nil { + updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) + return nil } - } - cancel() + if err != nil && c.option.enableForwarding { + // The reason we need to judge if the error code is equal to "Canceled" here is that + // when we create a stream we use a goroutine to manually control the timeout of the connection. + // There is no need to wait for the transport layer timeout which can reduce the time of unavailability. + // But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error. + // And actually the `Canceled` error can be regarded as a kind of network error in some way. + if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) { + networkErrNum++ + } + } + cancel() + } else { + networkErrNum++ + } select { case <-dispatcherCtx.Done(): return err diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 3834d9b53bf..bb4d6851fd0 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -518,7 +518,7 @@ func TestCustomTimeout(t *testing.T) { re.Less(time.Since(start), 2*time.Second) } -func TestGetRegionFromFollowerClient(t *testing.T) { +func TestGetRegionByFollowerForwarding(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -544,7 +544,7 @@ func TestGetRegionFromFollowerClient(t *testing.T) { } // case 1: unreachable -> normal -func TestGetTsoFromFollowerClient1(t *testing.T) { +func TestGetTsoByFollowerForwarding1(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -575,7 +575,7 @@ func TestGetTsoFromFollowerClient1(t *testing.T) { } // case 2: unreachable -> leader transfer -> normal -func TestGetTsoFromFollowerClient2(t *testing.T) { +func TestGetTsoByFollowerForwarding2(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -609,6 +609,101 @@ func TestGetTsoFromFollowerClient2(t *testing.T) { checkTS(re, cli, lastTS) } +// case 3: network partition between client and follower A -> transfer leader to follower A -> normal +func TestGetTsoAndRegionByFollowerForwarding(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + pd.LeaderHealthCheckInterval = 100 * time.Millisecond + cluster, err := tests.NewTestCluster(ctx, 3) + re.NoError(err) + defer cluster.Destroy() + + endpoints := runServer(re, cluster) + re.NotEmpty(cluster.WaitLeader()) + leader := cluster.GetLeaderServer() + grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr()) + testutil.Eventually(re, func() bool { + regionHeartbeat, err := grpcPDClient.RegionHeartbeat(ctx) + re.NoError(err) + regionID := regionIDAllocator.alloc() + region := &metapb.Region{ + Id: regionID, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + Peers: peers, + } + req := &pdpb.RegionHeartbeatRequest{ + Header: newHeader(leader.GetServer()), + Region: region, + Leader: peers[0], + } + err = regionHeartbeat.Send(req) + re.NoError(err) + _, err = regionHeartbeat.Recv() + return err == nil + }) + follower := cluster.GetServer(cluster.GetFollower()) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/grpcutil/unreachableNetwork2", fmt.Sprintf("return(\"%s\")", follower.GetAddr()))) + + cli := setupCli(re, ctx, endpoints, pd.WithForwardingOption(true)) + var lastTS uint64 + testutil.Eventually(re, func() bool { + physical, logical, err := cli.GetTS(context.TODO()) + if err == nil { + lastTS = tsoutil.ComposeTS(physical, logical) + return true + } + t.Log(err) + return false + }) + lastTS = checkTS(re, cli, lastTS) + r, err := cli.GetRegion(context.Background(), []byte("a")) + re.NoError(err) + re.NotNil(r) + leader.GetServer().GetMember().ResignEtcdLeader(leader.GetServer().Context(), + leader.GetServer().Name(), follower.GetServer().Name()) + re.NotEmpty(cluster.WaitLeader()) + testutil.Eventually(re, func() bool { + physical, logical, err := cli.GetTS(context.TODO()) + if err == nil { + lastTS = tsoutil.ComposeTS(physical, logical) + return true + } + t.Log(err) + return false + }) + lastTS = checkTS(re, cli, lastTS) + testutil.Eventually(re, func() bool { + r, err = cli.GetRegion(context.Background(), []byte("a")) + if err == nil && r != nil { + return true + } + return false + }) + + re.NoError(failpoint.Disable("github.com/tikv/pd/client/grpcutil/unreachableNetwork2")) + testutil.Eventually(re, func() bool { + physical, logical, err := cli.GetTS(context.TODO()) + if err == nil { + lastTS = tsoutil.ComposeTS(physical, logical) + return true + } + t.Log(err) + return false + }) + lastTS = checkTS(re, cli, lastTS) + testutil.Eventually(re, func() bool { + r, err = cli.GetRegion(context.Background(), []byte("a")) + if err == nil && r != nil { + return true + } + return false + }) +} + func checkTS(re *require.Assertions, cli pd.Client, lastTS uint64) uint64 { for i := 0; i < tsoRequestRound; i++ { physical, logical, err := cli.GetTS(context.TODO()) From 180ff57afe62c1391e3edace8a15336f4c139417 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Wed, 29 Nov 2023 18:05:48 +0800 Subject: [PATCH 10/14] client: avoid to add redundant grpc metadata (#7471) close tikv/pd#7469 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 130 ++++++++++++++++++-------------------- client/gc_client.go | 10 +-- client/keyspace_client.go | 4 -- 3 files changed, 64 insertions(+), 80 deletions(-) diff --git a/client/client.go b/client/client.go index f34f5897013..b320be6d3d5 100644 --- a/client/client.go +++ b/client/client.go @@ -765,8 +765,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { ctx, cancel := context.WithTimeout(ctx, c.option.timeout) req := &pdpb.GetMembersRequest{Header: c.requestHeader()} - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -825,6 +824,17 @@ func (c *client) getClient() pdpb.PDClient { return c.leaderClient() } +func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) { + if c.option.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 { + backupClientConn, addr := c.backupClientConn() + if backupClientConn != nil { + log.Debug("[pd] use follower client", zap.String("addr", addr)) + return pdpb.NewPDClient(backupClientConn), grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + } + } + return c.leaderClient(), ctx +} + func (c *client) GetTSAsync(ctx context.Context) TSFuture { return c.GetLocalTSAsync(ctx, globalDCLocation) } @@ -929,39 +939,6 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region { return r } -func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) { - if span := opentracing.SpanFromContext(ctx); span != nil { - span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context())) - defer span.Finish() - } - start := time.Now() - defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) - - options := &GetRegionOp{} - for _, opt := range opts { - opt(options) - } - req := &pdpb.GetRegionRequest{ - Header: c.requestHeader(), - RegionKey: key, - NeedBuckets: options.needBuckets, - } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() - if protoClient == nil { - cancel() - return nil, errs.ErrClientGetProtoClient - } - resp, err := protoClient.GetRegion(ctx, req) - cancel() - - if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil { - return nil, err - } - return handleRegionResponse(resp), nil -} - func isNetworkError(code codes.Code) bool { return code == codes.Unavailable || code == codes.DeadlineExceeded } @@ -1004,6 +981,38 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs return handleRegionResponse(resp), nil } +func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + + options := &GetRegionOp{} + for _, opt := range opts { + opt(options) + } + req := &pdpb.GetRegionRequest{ + Header: c.requestHeader(), + RegionKey: key, + NeedBuckets: options.needBuckets, + } + protoClient, ctx := c.getClientAndContext(ctx) + if protoClient == nil { + cancel() + return nil, errs.ErrClientGetProtoClient + } + resp, err := protoClient.GetRegion(ctx, req) + cancel() + + if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil { + return nil, err + } + return handleRegionResponse(resp), nil +} + func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil { span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context())) @@ -1022,8 +1031,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio RegionKey: key, NeedBuckets: options.needBuckets, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1055,8 +1063,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get RegionId: regionID, NeedBuckets: options.needBuckets, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1090,8 +1097,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, EndKey: endKey, Limit: int32(limit), } - scanCtx = grpcutil.BuildForwardContext(scanCtx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, scanCtx := c.getClientAndContext(scanCtx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1146,8 +1152,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e Header: c.requestHeader(), StoreId: storeID, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1191,8 +1196,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m Header: c.requestHeader(), ExcludeTombstoneStores: options.excludeTombstone, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1219,8 +1223,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 Header: c.requestHeader(), SafePoint: safePoint, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return 0, errs.ErrClientGetProtoClient @@ -1254,8 +1257,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, TTL: ttl, SafePoint: safePoint, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return 0, errs.ErrClientGetProtoClient @@ -1287,8 +1289,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g RegionId: regionID, Group: group, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return errs.ErrClientGetProtoClient @@ -1332,8 +1333,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, RetryLimit: options.retryLimit, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1355,8 +1355,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe Header: c.requestHeader(), RegionId: regionID, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1383,8 +1382,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...R SplitKeys: splitKeys, RetryLimit: options.retryLimit, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1414,8 +1412,7 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint SkipStoreLimit: options.skipStoreLimit, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return nil, errs.ErrClientGetProtoClient @@ -1465,8 +1462,7 @@ func trimHTTPPrefix(str string) string { func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) { ctx, cancel := context.WithTimeout(ctx, c.option.timeout) defer cancel() - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { return nil, 0, errs.ErrClientGetProtoClient } @@ -1497,8 +1493,7 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items } ctx, cancel := context.WithTimeout(ctx, c.option.timeout) defer cancel() - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { return errs.ErrClientGetProtoClient } @@ -1515,8 +1510,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis globalConfigWatcherCh := make(chan []GlobalConfigItem, 16) ctx, cancel := context.WithTimeout(ctx, c.option.timeout) defer cancel() - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -1564,8 +1558,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { ctx, cancel := context.WithTimeout(ctx, c.option.timeout) defer cancel() - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { return 0, errs.ErrClientGetProtoClient } @@ -1585,8 +1578,7 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error { ctx, cancel := context.WithTimeout(ctx, c.option.timeout) defer cancel() - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { return errs.ErrClientGetProtoClient } diff --git a/client/gc_client.go b/client/gc_client.go index b5d64e25129..fff292405c2 100644 --- a/client/gc_client.go +++ b/client/gc_client.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" - "github.com/tikv/pd/client/grpcutil" "go.uber.org/zap" ) @@ -48,8 +47,7 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf KeyspaceId: keyspaceID, SafePoint: safePoint, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return 0, errs.ErrClientGetProtoClient @@ -80,8 +78,7 @@ func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32 SafePoint: safePoint, Ttl: ttl, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { cancel() return 0, errs.ErrClientGetProtoClient @@ -104,8 +101,7 @@ func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan [ ctx, cancel := context.WithTimeout(ctx, c.option.timeout) defer cancel() - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { return nil, errs.ErrClientGetProtoClient } diff --git a/client/keyspace_client.go b/client/keyspace_client.go index d9b9172dd69..fedb7452412 100644 --- a/client/keyspace_client.go +++ b/client/keyspace_client.go @@ -21,7 +21,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/keyspacepb" - "github.com/tikv/pd/client/grpcutil" ) // KeyspaceClient manages keyspace metadata. @@ -57,7 +56,6 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key Header: c.requestHeader(), Name: name, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) resp, err := c.keyspaceClient().LoadKeyspace(ctx, req) cancel() @@ -98,7 +96,6 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp Id: id, State: state, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) resp, err := c.keyspaceClient().UpdateKeyspaceState(ctx, req) cancel() @@ -138,7 +135,6 @@ func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint StartId: startID, Limit: limit, } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) resp, err := c.keyspaceClient().GetAllKeyspaces(ctx, req) cancel() From 871be59ab0dfb71d8ad1ee0ff918978f01d7728d Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 29 Nov 2023 20:02:47 +0800 Subject: [PATCH 11/14] api: return `[]` rather than `null` for empty scheduler results (#7454) close tikv/pd#7452 Signed-off-by: lhy1024 --- pkg/schedule/handler/handler.go | 6 ++-- tests/pdctl/scheduler/scheduler_test.go | 4 +-- tests/server/api/scheduler_test.go | 47 +++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index 353e2bb60e2..346a7254284 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -807,7 +807,7 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{}, schedulers := sc.GetSchedulerNames() switch status { case "paused": - var pausedSchedulers []string + pausedSchedulers := make([]string, 0, len(schedulers)) pausedPeriods := []schedulerPausedPeriod{} for _, scheduler := range schedulers { paused, err := sc.IsSchedulerPaused(scheduler) @@ -842,7 +842,7 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{}, } return pausedSchedulers, nil case "disabled": - var disabledSchedulers []string + disabledSchedulers := make([]string, 0, len(schedulers)) for _, scheduler := range schedulers { disabled, err := sc.IsSchedulerDisabled(scheduler) if err != nil { @@ -857,7 +857,7 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{}, // The default scheduler could not be deleted in scheduling server, // so schedulers could only be disabled. // We should not return the disabled schedulers here. - var enabledSchedulers []string + enabledSchedulers := make([]string, 0, len(schedulers)) for _, scheduler := range schedulers { disabled, err := sc.IsSchedulerDisabled(scheduler) if err != nil { diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 7e9aeef16ee..d5bea895683 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -545,7 +545,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"}) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) re.Contains(echo, "Success!") - checkSchedulerWithStatusCommand("paused", nil) + checkSchedulerWithStatusCommand("paused", []string{}) // set label scheduler to disabled manually. echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "label-scheduler"}, nil) @@ -560,7 +560,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { cfg.Schedulers = origin err = leaderServer.GetServer().SetScheduleConfig(*cfg) re.NoError(err) - checkSchedulerWithStatusCommand("disabled", nil) + checkSchedulerWithStatusCommand("disabled", []string{}) } func (suite *schedulerTestSuite) TestSchedulerDiagnostic() { diff --git a/tests/server/api/scheduler_test.go b/tests/server/api/scheduler_test.go index 86b932b0a7a..2388f95e9df 100644 --- a/tests/server/api/scheduler_test.go +++ b/tests/server/api/scheduler_test.go @@ -17,6 +17,7 @@ package api import ( "encoding/json" "fmt" + "io" "net/http" "reflect" "testing" @@ -28,6 +29,7 @@ import ( "github.com/stretchr/testify/suite" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/utils/apiutil" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" "github.com/tikv/pd/tests" @@ -673,6 +675,40 @@ func (suite *scheduleTestSuite) testPauseOrResume(urlPrefix string, name, create suite.False(isPaused) } +func (suite *scheduleTestSuite) TestEmptySchedulers() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkEmptySchedulers) +} + +func (suite *scheduleTestSuite) checkEmptySchedulers(cluster *tests.TestCluster) { + re := suite.Require() + leaderAddr := cluster.GetLeaderServer().GetAddr() + urlPrefix := fmt.Sprintf("%s/pd/api/v1/schedulers", leaderAddr) + for i := 1; i <= 4; i++ { + store := &metapb.Store{ + Id: uint64(i), + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + } + tests.MustPutStore(suite.Require(), cluster, store) + } + + // test disabled and paused schedulers + suite.checkEmptySchedulersResp(urlPrefix + "?status=disabled") + suite.checkEmptySchedulersResp(urlPrefix + "?status=paused") + + // test enabled schedulers + schedulers := make([]string, 0) + suite.NoError(tu.ReadGetJSON(re, testDialClient, urlPrefix, &schedulers)) + for _, scheduler := range schedulers { + suite.deleteScheduler(urlPrefix, scheduler) + } + suite.NoError(tu.ReadGetJSON(re, testDialClient, urlPrefix, &schedulers)) + suite.Len(schedulers, 0) + suite.checkEmptySchedulersResp(urlPrefix) +} + func (suite *scheduleTestSuite) assertSchedulerExists(re *require.Assertions, urlPrefix string, scheduler string) { var schedulers []string tu.Eventually(re, func() bool { @@ -700,3 +736,14 @@ func (suite *scheduleTestSuite) isSchedulerPaused(urlPrefix, name string) bool { } return false } + +func (suite *scheduleTestSuite) checkEmptySchedulersResp(url string) { + resp, err := apiutil.GetJSON(testDialClient, url, nil) + suite.NoError(err) + defer resp.Body.Close() + suite.Equal(http.StatusOK, resp.StatusCode) + b, err := io.ReadAll(resp.Body) + suite.NoError(err) + suite.Contains(string(b), "[]") + suite.NotContains(string(b), "null") +} From 150139c48895b019938db9d1a33d5138d9254047 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 30 Nov 2023 06:29:49 +0000 Subject: [PATCH 12/14] build(deps): bump google.golang.org/grpc from 1.54.0 to 1.56.3 in /tools/pd-tso-bench (#7259) ref tikv/pd#4399 Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: disksing Co-authored-by: JmPotato --- tools/pd-tso-bench/go.mod | 2 +- tools/pd-tso-bench/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/pd-tso-bench/go.mod b/tools/pd-tso-bench/go.mod index f89f11ee082..8d4b3d18a31 100644 --- a/tools/pd-tso-bench/go.mod +++ b/tools/pd-tso-bench/go.mod @@ -9,7 +9,7 @@ require ( github.com/prometheus/client_golang v1.11.1 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 go.uber.org/zap v1.24.0 - google.golang.org/grpc v1.54.0 + google.golang.org/grpc v1.56.3 ) require ( diff --git a/tools/pd-tso-bench/go.sum b/tools/pd-tso-bench/go.sum index 1c266823dee..15ba2923695 100644 --- a/tools/pd-tso-bench/go.sum +++ b/tools/pd-tso-bench/go.sum @@ -205,8 +205,8 @@ gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6d google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= -google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= -google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= +google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc= +google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= From beedacb8b291b60db4108ba94cbf75fc72ab20a2 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 30 Nov 2023 15:11:49 +0800 Subject: [PATCH 13/14] schedulers: fix config data race (#7486) close tikv/pd#7485 Signed-off-by: lhy1024 --- pkg/schedule/schedulers/balance_leader.go | 36 ++++++++++++------- pkg/schedule/schedulers/balance_witness.go | 24 +++++++++---- pkg/schedule/schedulers/grant_hot_region.go | 13 +++++-- pkg/schedule/schedulers/grant_leader.go | 17 ++++++--- pkg/schedule/schedulers/hot_region_config.go | 4 +-- pkg/schedule/schedulers/hot_region_test.go | 28 +++++++-------- pkg/schedule/schedulers/scatter_range.go | 4 +-- pkg/schedule/schedulers/shuffle_hot_region.go | 8 ++++- pkg/schedule/schedulers/split_bucket.go | 16 +++++++-- 9 files changed, 105 insertions(+), 45 deletions(-) diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 6a64edf7e70..eb94752944b 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -78,19 +78,19 @@ func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, interface{}) conf.Lock() defer conf.Unlock() - oldc, _ := json.Marshal(conf) + oldConfig, _ := json.Marshal(conf) if err := json.Unmarshal(data, conf); err != nil { return http.StatusInternalServerError, err.Error() } - newc, _ := json.Marshal(conf) - if !bytes.Equal(oldc, newc) { - if !conf.validate() { - json.Unmarshal(oldc, conf) + newConfig, _ := json.Marshal(conf) + if !bytes.Equal(oldConfig, newConfig) { + if !conf.validateLocked() { + json.Unmarshal(oldConfig, conf) return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10" } conf.persistLocked() - log.Info("balance-leader-scheduler config is updated", zap.ByteString("old", oldc), zap.ByteString("new", newc)) + log.Info("balance-leader-scheduler config is updated", zap.ByteString("old", oldConfig), zap.ByteString("new", newConfig)) return http.StatusOK, "Config is updated." } m := make(map[string]interface{}) @@ -104,7 +104,7 @@ func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, interface{}) return http.StatusBadRequest, "Config item is not found." } -func (conf *balanceLeaderSchedulerConfig) validate() bool { +func (conf *balanceLeaderSchedulerConfig) validateLocked() bool { return conf.Batch >= 1 && conf.Batch <= 10 } @@ -127,6 +127,20 @@ func (conf *balanceLeaderSchedulerConfig) persistLocked() error { return conf.storage.SaveSchedulerConfig(BalanceLeaderName, data) } +func (conf *balanceLeaderSchedulerConfig) getBatch() int { + conf.RLock() + defer conf.RUnlock() + return conf.Batch +} + +func (conf *balanceLeaderSchedulerConfig) getRanges() []core.KeyRange { + conf.RLock() + defer conf.RUnlock() + ranges := make([]core.KeyRange, len(conf.Ranges)) + copy(ranges, conf.Ranges) + return ranges +} + type balanceLeaderHandler struct { rd *render.Render config *balanceLeaderSchedulerConfig @@ -335,14 +349,12 @@ func (cs *candidateStores) resortStoreWithPos(pos int) { } func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { - l.conf.RLock() - defer l.conf.RUnlock() basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { collector = plan.NewCollector(basePlan) } - batch := l.conf.Batch + batch := l.conf.getBatch() balanceLeaderScheduleCounter.Inc() leaderSchedulePolicy := cluster.GetSchedulerConfig().GetLeaderSchedulePolicy() @@ -441,7 +453,7 @@ func makeInfluence(op *operator.Operator, plan *solver, usedRegions map[uint64]s // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the leader. func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.SourceStoreID(), l.conf.Ranges), + solver.Region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.SourceStoreID(), l.conf.getRanges()), collector, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.SourceStoreID())) @@ -485,7 +497,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl // It randomly selects a health region from the target store, then picks // the worst follower peer and transfers the leader. func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.TargetStoreID(), l.conf.Ranges), + solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.TargetStoreID(), l.conf.getRanges()), nil, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.TargetStoreID())) diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index bf3fbbb83da..9994866ac50 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -71,7 +71,7 @@ func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, interface{} } newc, _ := json.Marshal(conf) if !bytes.Equal(oldc, newc) { - if !conf.validate() { + if !conf.validateLocked() { json.Unmarshal(oldc, conf) return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10" } @@ -90,7 +90,7 @@ func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, interface{} return http.StatusBadRequest, "Config item is not found." } -func (conf *balanceWitnessSchedulerConfig) validate() bool { +func (conf *balanceWitnessSchedulerConfig) validateLocked() bool { return conf.Batch >= 1 && conf.Batch <= 10 } @@ -113,6 +113,20 @@ func (conf *balanceWitnessSchedulerConfig) persistLocked() error { return conf.storage.SaveSchedulerConfig(BalanceWitnessName, data) } +func (conf *balanceWitnessSchedulerConfig) getBatch() int { + conf.RLock() + defer conf.RUnlock() + return conf.Batch +} + +func (conf *balanceWitnessSchedulerConfig) getRanges() []core.KeyRange { + conf.RLock() + defer conf.RUnlock() + ranges := make([]core.KeyRange, len(conf.Ranges)) + copy(ranges, conf.Ranges) + return ranges +} + type balanceWitnessHandler struct { rd *render.Render config *balanceWitnessSchedulerConfig @@ -238,14 +252,12 @@ func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluste } func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { - b.conf.RLock() - defer b.conf.RUnlock() basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { collector = plan.NewCollector(basePlan) } - batch := b.conf.Batch + batch := b.conf.getBatch() schedulerCounter.WithLabelValues(b.GetName(), "schedule").Inc() opInfluence := b.OpController.GetOpInfluence(cluster.GetBasicCluster()) @@ -305,7 +317,7 @@ func createTransferWitnessOperator(cs *candidateStores, b *balanceWitnessSchedul // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the witness. func (b *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandWitnessRegions(solver.SourceStoreID(), b.conf.Ranges), + solver.Region = filter.SelectOneRegion(solver.RandWitnessRegions(solver.SourceStoreID(), b.conf.getRanges()), collector, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { log.Debug("store has no witness", zap.String("scheduler", b.GetName()), zap.Uint64("store-id", solver.SourceStoreID())) diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 6ab689ea5d4..81399b58c58 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -120,6 +120,14 @@ func (conf *grantHotRegionSchedulerConfig) has(storeID uint64) bool { }) } +func (conf *grantHotRegionSchedulerConfig) getStoreIDs() []uint64 { + conf.RLock() + defer conf.RUnlock() + storeIDs := make([]uint64, len(conf.StoreIDs)) + copy(storeIDs, conf.StoreIDs) + return storeIDs +} + // grantLeaderScheduler transfers all hot peers to peers and transfer leader to the fixed store type grantHotRegionScheduler struct { *baseHotScheduler @@ -313,7 +321,8 @@ func (s *grantHotRegionScheduler) transfer(cluster sche.SchedulerCluster, region filter.NewPlacementSafeguard(s.GetName(), cluster.GetSchedulerConfig(), cluster.GetBasicCluster(), cluster.GetRuleManager(), srcRegion, srcStore, nil), } - destStoreIDs := make([]uint64, 0, len(s.conf.StoreIDs)) + storeIDs := s.conf.getStoreIDs() + destStoreIDs := make([]uint64, 0, len(storeIDs)) var candidate []uint64 if isLeader { filters = append(filters, &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.High}) @@ -321,7 +330,7 @@ func (s *grantHotRegionScheduler) transfer(cluster sche.SchedulerCluster, region } else { filters = append(filters, &filter.StoreStateFilter{ActionScope: s.GetName(), MoveRegion: true, OperatorLevel: constant.High}, filter.NewExcludedFilter(s.GetName(), srcRegion.GetStoreIDs(), srcRegion.GetStoreIDs())) - candidate = s.conf.StoreIDs + candidate = storeIDs } for _, storeID := range candidate { store := cluster.GetStore(storeID) diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 47e14af4902..885f81e2442 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -143,6 +143,16 @@ func (conf *grantLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRa return nil } +func (conf *grantLeaderSchedulerConfig) getStoreIDWithRanges() map[uint64][]core.KeyRange { + conf.RLock() + defer conf.RUnlock() + storeIDWithRanges := make(map[uint64][]core.KeyRange) + for id, ranges := range conf.StoreIDWithRanges { + storeIDWithRanges[id] = ranges + } + return storeIDWithRanges +} + // grantLeaderScheduler transfers all leaders to peers in the store. type grantLeaderScheduler struct { *BaseScheduler @@ -227,12 +237,11 @@ func (s *grantLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) func (s *grantLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { grantLeaderCounter.Inc() - s.conf.RLock() - defer s.conf.RUnlock() - ops := make([]*operator.Operator, 0, len(s.conf.StoreIDWithRanges)) + storeIDWithRanges := s.conf.getStoreIDWithRanges() + ops := make([]*operator.Operator, 0, len(storeIDWithRanges)) pendingFilter := filter.NewRegionPendingFilter() downFilter := filter.NewRegionDownFilter() - for id, ranges := range s.conf.StoreIDWithRanges { + for id, ranges := range storeIDWithRanges { region := filter.SelectOneRegion(cluster.RandFollowerRegions(id, ranges), nil, pendingFilter, downFilter) if region == nil { grantLeaderNoFollowerCounter.Inc() diff --git a/pkg/schedule/schedulers/hot_region_config.go b/pkg/schedule/schedulers/hot_region_config.go index 2ff78748f02..3f9f8b8c669 100644 --- a/pkg/schedule/schedulers/hot_region_config.go +++ b/pkg/schedule/schedulers/hot_region_config.go @@ -366,7 +366,7 @@ func isPriorityValid(priorities []string) (map[string]bool, error) { return priorityMap, nil } -func (conf *hotRegionSchedulerConfig) valid() error { +func (conf *hotRegionSchedulerConfig) validateLocked() error { if _, err := isPriorityValid(conf.ReadPriorities); err != nil { return err } @@ -409,7 +409,7 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r * rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - if err := conf.valid(); err != nil { + if err := conf.validateLocked(); err != nil { // revert to old version if err2 := json.Unmarshal(oldc, conf); err2 != nil { rd.JSON(w, http.StatusInternalServerError, err2.Error()) diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 15c037ddd22..6e7208e4251 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -2499,32 +2499,32 @@ func TestConfigValidation(t *testing.T) { re := require.New(t) hc := initHotRegionScheduleConfig() - err := hc.valid() + err := hc.validateLocked() re.NoError(err) // priorities is illegal hc.ReadPriorities = []string{"byte", "error"} - err = hc.valid() + err = hc.validateLocked() re.Error(err) // priorities should have at least 2 dimensions hc = initHotRegionScheduleConfig() hc.WriteLeaderPriorities = []string{"byte"} - err = hc.valid() + err = hc.validateLocked() re.Error(err) // query is not allowed to be set in priorities for write-peer-priorities hc = initHotRegionScheduleConfig() hc.WritePeerPriorities = []string{"query", "byte"} - err = hc.valid() + err = hc.validateLocked() re.Error(err) // priorities shouldn't be repeated hc.WritePeerPriorities = []string{"byte", "byte"} - err = hc.valid() + err = hc.validateLocked() re.Error(err) // no error hc.WritePeerPriorities = []string{"byte", "key"} - err = hc.valid() + err = hc.validateLocked() re.NoError(err) // rank-formula-version @@ -2533,17 +2533,17 @@ func TestConfigValidation(t *testing.T) { re.Equal("v2", hc.GetRankFormulaVersion()) // v1 hc.RankFormulaVersion = "v1" - err = hc.valid() + err = hc.validateLocked() re.NoError(err) re.Equal("v1", hc.GetRankFormulaVersion()) // v2 hc.RankFormulaVersion = "v2" - err = hc.valid() + err = hc.validateLocked() re.NoError(err) re.Equal("v2", hc.GetRankFormulaVersion()) // illegal hc.RankFormulaVersion = "v0" - err = hc.valid() + err = hc.validateLocked() re.Error(err) // forbid-rw-type @@ -2553,27 +2553,27 @@ func TestConfigValidation(t *testing.T) { re.False(hc.IsForbidRWType(utils.Write)) // read hc.ForbidRWType = "read" - err = hc.valid() + err = hc.validateLocked() re.NoError(err) re.True(hc.IsForbidRWType(utils.Read)) re.False(hc.IsForbidRWType(utils.Write)) // write hc.ForbidRWType = "write" - err = hc.valid() + err = hc.validateLocked() re.NoError(err) re.False(hc.IsForbidRWType(utils.Read)) re.True(hc.IsForbidRWType(utils.Write)) // illegal hc.ForbidRWType = "test" - err = hc.valid() + err = hc.validateLocked() re.Error(err) hc.SplitThresholds = 0 - err = hc.valid() + err = hc.validateLocked() re.Error(err) hc.SplitThresholds = 1.1 - err = hc.valid() + err = hc.validateLocked() re.Error(err) } diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index 1bc6eafb58e..977d8cff05c 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -214,7 +214,7 @@ func (l *scatterRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun b if l.allowBalanceLeader(cluster) { ops, _ := l.balanceLeader.Schedule(c, false) if len(ops) > 0 { - ops[0].SetDesc(fmt.Sprintf("scatter-range-leader-%s", l.config.RangeName)) + ops[0].SetDesc(fmt.Sprintf("scatter-range-leader-%s", l.config.GetRangeName())) ops[0].AttachKind(operator.OpRange) ops[0].Counters = append(ops[0].Counters, scatterRangeNewOperatorCounter, @@ -226,7 +226,7 @@ func (l *scatterRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun b if l.allowBalanceRegion(cluster) { ops, _ := l.balanceRegion.Schedule(c, false) if len(ops) > 0 { - ops[0].SetDesc(fmt.Sprintf("scatter-range-region-%s", l.config.RangeName)) + ops[0].SetDesc(fmt.Sprintf("scatter-range-region-%s", l.config.GetRangeName())) ops[0].AttachKind(operator.OpRange) ops[0].Counters = append(ops[0].Counters, scatterRangeNewOperatorCounter, diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index 6ad6656fd18..cd5c40d4e07 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -77,6 +77,12 @@ func (conf *shuffleHotRegionSchedulerConfig) persistLocked() error { return conf.storage.SaveSchedulerConfig(name, data) } +func (conf *shuffleHotRegionSchedulerConfig) getLimit() uint64 { + conf.RLock() + defer conf.RUnlock() + return conf.Limit +} + // ShuffleHotRegionScheduler mainly used to test. // It will randomly pick a hot peer, and move the peer // to a random store, and then transfer the leader to @@ -134,7 +140,7 @@ func (s *shuffleHotRegionScheduler) ReloadConfig() error { } func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { - hotRegionAllowed := s.OpController.OperatorCount(operator.OpHotRegion) < s.conf.Limit + hotRegionAllowed := s.OpController.OperatorCount(operator.OpHotRegion) < s.conf.getLimit() conf := cluster.GetSchedulerConfig() regionAllowed := s.OpController.OperatorCount(operator.OpRegion) < conf.GetRegionScheduleLimit() leaderAllowed := s.OpController.OperatorCount(operator.OpLeader) < conf.GetLeaderScheduleLimit() diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 6faf03e1fef..5e31f58129c 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -87,6 +87,18 @@ func (conf *splitBucketSchedulerConfig) persistLocked() error { return conf.storage.SaveSchedulerConfig(SplitBucketName, data) } +func (conf *splitBucketSchedulerConfig) getDegree() int { + conf.RLock() + defer conf.RUnlock() + return conf.Degree +} + +func (conf *splitBucketSchedulerConfig) getSplitLimit() uint64 { + conf.RLock() + defer conf.RUnlock() + return conf.SplitLimit +} + type splitBucketScheduler struct { *BaseScheduler conf *splitBucketSchedulerConfig @@ -202,7 +214,7 @@ func (s *splitBucketScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) splitBucketDisableCounter.Inc() return false } - allowed := s.BaseScheduler.OpController.OperatorCount(operator.OpSplit) < s.conf.SplitLimit + allowed := s.BaseScheduler.OpController.OperatorCount(operator.OpSplit) < s.conf.getSplitLimit() if !allowed { splitBuckerSplitLimitCounter.Inc() operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpSplit.String()).Inc() @@ -224,7 +236,7 @@ func (s *splitBucketScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bo plan := &splitBucketPlan{ conf: conf, cluster: cluster, - hotBuckets: cluster.BucketsStats(conf.Degree), + hotBuckets: cluster.BucketsStats(conf.getDegree()), hotRegionSplitSize: cluster.GetSchedulerConfig().GetMaxMovableHotPeerSize(), } return s.splitBucket(plan), nil From 862eee18738eabb651601a9362547e5283ee830a Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 30 Nov 2023 16:16:18 +0800 Subject: [PATCH 14/14] client/http: implement the marshaler interfaces for Rule/RuleOp (#7462) ref tikv/pd#7300 Implement the marshaler interfaces for `Rule` and `RuleOP` to make sure we could set/get the correct start/end key. Ref https://github.com/pingcap/tidb/blob/46d4231c8b0ade353b98572e7c2a015bddf940f4/pkg/ddl/placement/rule.go#L76-L91. Signed-off-by: JmPotato --- client/http/api.go | 14 +- client/http/client.go | 21 ++- client/http/codec.go | 121 +++++++++++++ client/http/codec_test.go | 64 +++++++ client/http/types.go | 168 ++++++++++++++++++ client/http/types_test.go | 151 ++++++++++++++++ server/api/stats.go | 5 +- server/api/store.go | 16 +- tests/integrations/client/http_client_test.go | 46 +++-- 9 files changed, 582 insertions(+), 24 deletions(-) create mode 100644 client/http/codec.go create mode 100644 client/http/codec_test.go diff --git a/client/http/api.go b/client/http/api.go index 6b317330b61..f744fd0c395 100644 --- a/client/http/api.go +++ b/client/http/api.go @@ -31,6 +31,7 @@ const ( Regions = "/pd/api/v1/regions" regionsByKey = "/pd/api/v1/regions/key" RegionsByStoreIDPrefix = "/pd/api/v1/regions/store" + regionsReplicated = "/pd/api/v1/regions/replicated" EmptyRegions = "/pd/api/v1/regions/check/empty-region" AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule" AccelerateScheduleInBatch = "/pd/api/v1/regions/accelerate-schedule/batch" @@ -95,9 +96,20 @@ func RegionsByStoreID(storeID uint64) string { return fmt.Sprintf("%s/%d", RegionsByStoreIDPrefix, storeID) } +// RegionsReplicatedByKeyRange returns the path of PD HTTP API to get replicated regions with given start key and end key. +func RegionsReplicatedByKeyRange(keyRange *KeyRange) string { + startKeyStr, endKeyStr := keyRange.EscapeAsHexStr() + return fmt.Sprintf("%s?startKey=%s&endKey=%s", + regionsReplicated, startKeyStr, endKeyStr) +} + // RegionStatsByKeyRange returns the path of PD HTTP API to get region stats by start key and end key. -func RegionStatsByKeyRange(keyRange *KeyRange) string { +func RegionStatsByKeyRange(keyRange *KeyRange, onlyCount bool) string { startKeyStr, endKeyStr := keyRange.EscapeAsUTF8Str() + if onlyCount { + return fmt.Sprintf("%s?start_key=%s&end_key=%s&count", + StatsRegion, startKeyStr, endKeyStr) + } return fmt.Sprintf("%s?start_key=%s&end_key=%s", StatsRegion, startKeyStr, endKeyStr) } diff --git a/client/http/client.go b/client/http/client.go index d15693e11d4..36355a90d19 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -47,10 +47,11 @@ type Client interface { GetRegions(context.Context) (*RegionsInfo, error) GetRegionsByKeyRange(context.Context, *KeyRange, int) (*RegionsInfo, error) GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error) + GetRegionsReplicatedStateByKeyRange(context.Context, *KeyRange) (string, error) GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error) GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error) GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error) - GetRegionStatusByKeyRange(context.Context, *KeyRange) (*RegionStats, error) + GetRegionStatusByKeyRange(context.Context, *KeyRange, bool) (*RegionStats, error) GetStores(context.Context) (*StoresInfo, error) /* Config-related interfaces */ GetScheduleConfig(context.Context) (map[string]interface{}, error) @@ -356,6 +357,19 @@ func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*Regi return ®ions, nil } +// GetRegionsReplicatedStateByKeyRange gets the regions replicated state info by key range. +// The keys in the key range should be encoded in the hex bytes format (without encoding to the UTF-8 bytes). +func (c *client) GetRegionsReplicatedStateByKeyRange(ctx context.Context, keyRange *KeyRange) (string, error) { + var state string + err := c.requestWithRetry(ctx, + "GetRegionsReplicatedStateByKeyRange", RegionsReplicatedByKeyRange(keyRange), + http.MethodGet, http.NoBody, &state) + if err != nil { + return "", err + } + return state, nil +} + // GetHotReadRegions gets the hot read region statistics info. func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) { var hotReadRegions StoreHotPeersInfos @@ -398,11 +412,12 @@ func (c *client) GetHistoryHotRegions(ctx context.Context, req *HistoryHotRegion } // GetRegionStatusByKeyRange gets the region status by key range. +// If the `onlyCount` flag is true, the result will only include the count of regions. // The keys in the key range should be encoded in the UTF-8 bytes format. -func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange) (*RegionStats, error) { +func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange, onlyCount bool) (*RegionStats, error) { var regionStats RegionStats err := c.requestWithRetry(ctx, - "GetRegionStatusByKeyRange", RegionStatsByKeyRange(keyRange), + "GetRegionStatusByKeyRange", RegionStatsByKeyRange(keyRange, onlyCount), http.MethodGet, http.NoBody, ®ionStats, ) if err != nil { diff --git a/client/http/codec.go b/client/http/codec.go new file mode 100644 index 00000000000..26be64b4f28 --- /dev/null +++ b/client/http/codec.go @@ -0,0 +1,121 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "encoding/hex" + + "github.com/pingcap/errors" +) + +const ( + encGroupSize = 8 + encMarker = byte(0xFF) + encPad = byte(0x0) +) + +var pads = make([]byte, encGroupSize) + +// encodeBytes guarantees the encoded value is in ascending order for comparison, +// encoding with the following rule: +// +// [group1][marker1]...[groupN][markerN] +// group is 8 bytes slice which is padding with 0. +// marker is `0xFF - padding 0 count` +// +// For example: +// +// [] -> [0, 0, 0, 0, 0, 0, 0, 0, 247] +// [1, 2, 3] -> [1, 2, 3, 0, 0, 0, 0, 0, 250] +// [1, 2, 3, 0] -> [1, 2, 3, 0, 0, 0, 0, 0, 251] +// [1, 2, 3, 4, 5, 6, 7, 8] -> [1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247] +// +// Refer: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format +func encodeBytes(data []byte) []byte { + // Allocate more space to avoid unnecessary slice growing. + // Assume that the byte slice size is about `(len(data) / encGroupSize + 1) * (encGroupSize + 1)` bytes, + // that is `(len(data) / 8 + 1) * 9` in our implement. + dLen := len(data) + result := make([]byte, 0, (dLen/encGroupSize+1)*(encGroupSize+1)) + for idx := 0; idx <= dLen; idx += encGroupSize { + remain := dLen - idx + padCount := 0 + if remain >= encGroupSize { + result = append(result, data[idx:idx+encGroupSize]...) + } else { + padCount = encGroupSize - remain + result = append(result, data[idx:]...) + result = append(result, pads[:padCount]...) + } + + marker := encMarker - byte(padCount) + result = append(result, marker) + } + return result +} + +func decodeBytes(b []byte) ([]byte, error) { + buf := make([]byte, 0, len(b)) + for { + if len(b) < encGroupSize+1 { + return nil, errors.New("insufficient bytes to decode value") + } + + groupBytes := b[:encGroupSize+1] + + group := groupBytes[:encGroupSize] + marker := groupBytes[encGroupSize] + + padCount := encMarker - marker + if padCount > encGroupSize { + return nil, errors.Errorf("invalid marker byte, group bytes %q", groupBytes) + } + + realGroupSize := encGroupSize - padCount + buf = append(buf, group[:realGroupSize]...) + b = b[encGroupSize+1:] + + if padCount != 0 { + // Check validity of padding bytes. + for _, v := range group[realGroupSize:] { + if v != encPad { + return nil, errors.Errorf("invalid padding byte, group bytes %q", groupBytes) + } + } + break + } + } + return buf, nil +} + +// rawKeyToKeyHexStr converts a raw key to a hex string after encoding. +func rawKeyToKeyHexStr(key []byte) string { + if len(key) == 0 { + return "" + } + return hex.EncodeToString(encodeBytes(key)) +} + +// keyHexStrToRawKey converts a hex string to a raw key after decoding. +func keyHexStrToRawKey(hexKey string) ([]byte, error) { + if len(hexKey) == 0 { + return make([]byte, 0), nil + } + key, err := hex.DecodeString(hexKey) + if err != nil { + return nil, err + } + return decodeBytes(key) +} diff --git a/client/http/codec_test.go b/client/http/codec_test.go new file mode 100644 index 00000000000..fa8d413a0d1 --- /dev/null +++ b/client/http/codec_test.go @@ -0,0 +1,64 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBytesCodec(t *testing.T) { + inputs := []struct { + enc []byte + dec []byte + }{ + {[]byte{}, []byte{0, 0, 0, 0, 0, 0, 0, 0, 247}}, + {[]byte{0}, []byte{0, 0, 0, 0, 0, 0, 0, 0, 248}}, + {[]byte{1, 2, 3}, []byte{1, 2, 3, 0, 0, 0, 0, 0, 250}}, + {[]byte{1, 2, 3, 0}, []byte{1, 2, 3, 0, 0, 0, 0, 0, 251}}, + {[]byte{1, 2, 3, 4, 5, 6, 7}, []byte{1, 2, 3, 4, 5, 6, 7, 0, 254}}, + {[]byte{0, 0, 0, 0, 0, 0, 0, 0}, []byte{0, 0, 0, 0, 0, 0, 0, 0, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247}}, + {[]byte{1, 2, 3, 4, 5, 6, 7, 8}, []byte{1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247}}, + {[]byte{1, 2, 3, 4, 5, 6, 7, 8, 9}, []byte{1, 2, 3, 4, 5, 6, 7, 8, 255, 9, 0, 0, 0, 0, 0, 0, 0, 248}}, + } + + for _, input := range inputs { + b := encodeBytes(input.enc) + require.Equal(t, input.dec, b) + + d, err := decodeBytes(b) + require.NoError(t, err) + require.Equal(t, input.enc, d) + } + + // Test error decode. + errInputs := [][]byte{ + {1, 2, 3, 4}, + {0, 0, 0, 0, 0, 0, 0, 247}, + {0, 0, 0, 0, 0, 0, 0, 0, 246}, + {0, 0, 0, 0, 0, 0, 0, 1, 247}, + {1, 2, 3, 4, 5, 6, 7, 8, 0}, + {1, 2, 3, 4, 5, 6, 7, 8, 255, 1}, + {1, 2, 3, 4, 5, 6, 7, 8, 255, 1, 2, 3, 4, 5, 6, 7, 8}, + {1, 2, 3, 4, 5, 6, 7, 8, 255, 1, 2, 3, 4, 5, 6, 7, 8, 255}, + {1, 2, 3, 4, 5, 6, 7, 8, 255, 1, 2, 3, 4, 5, 6, 7, 8, 0}, + } + + for _, input := range errInputs { + _, err := decodeBytes(input) + require.Error(t, err) + } +} diff --git a/client/http/types.go b/client/http/types.go index 4e99d911e0b..1d8db36d100 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -341,6 +341,86 @@ func (r *Rule) Clone() *Rule { return &clone } +var ( + _ json.Marshaler = (*Rule)(nil) + _ json.Unmarshaler = (*Rule)(nil) +) + +// This is a helper struct used to customizing the JSON marshal/unmarshal methods of `Rule`. +type rule struct { + GroupID string `json:"group_id"` + ID string `json:"id"` + Index int `json:"index,omitempty"` + Override bool `json:"override,omitempty"` + StartKeyHex string `json:"start_key"` + EndKeyHex string `json:"end_key"` + Role PeerRoleType `json:"role"` + IsWitness bool `json:"is_witness"` + Count int `json:"count"` + LabelConstraints []LabelConstraint `json:"label_constraints,omitempty"` + LocationLabels []string `json:"location_labels,omitempty"` + IsolationLevel string `json:"isolation_level,omitempty"` +} + +// MarshalJSON implements `json.Marshaler` interface to make sure we could set the correct start/end key. +func (r *Rule) MarshalJSON() ([]byte, error) { + tempRule := &rule{ + GroupID: r.GroupID, + ID: r.ID, + Index: r.Index, + Override: r.Override, + StartKeyHex: r.StartKeyHex, + EndKeyHex: r.EndKeyHex, + Role: r.Role, + IsWitness: r.IsWitness, + Count: r.Count, + LabelConstraints: r.LabelConstraints, + LocationLabels: r.LocationLabels, + IsolationLevel: r.IsolationLevel, + } + // Converts the start/end key to hex format if the corresponding hex field is empty. + if len(r.StartKey) > 0 && len(r.StartKeyHex) == 0 { + tempRule.StartKeyHex = rawKeyToKeyHexStr(r.StartKey) + } + if len(r.EndKey) > 0 && len(r.EndKeyHex) == 0 { + tempRule.EndKeyHex = rawKeyToKeyHexStr(r.EndKey) + } + return json.Marshal(tempRule) +} + +// UnmarshalJSON implements `json.Unmarshaler` interface to make sure we could get the correct start/end key. +func (r *Rule) UnmarshalJSON(bytes []byte) error { + var tempRule rule + err := json.Unmarshal(bytes, &tempRule) + if err != nil { + return err + } + newRule := Rule{ + GroupID: tempRule.GroupID, + ID: tempRule.ID, + Index: tempRule.Index, + Override: tempRule.Override, + StartKeyHex: tempRule.StartKeyHex, + EndKeyHex: tempRule.EndKeyHex, + Role: tempRule.Role, + IsWitness: tempRule.IsWitness, + Count: tempRule.Count, + LabelConstraints: tempRule.LabelConstraints, + LocationLabels: tempRule.LocationLabels, + IsolationLevel: tempRule.IsolationLevel, + } + newRule.StartKey, err = keyHexStrToRawKey(newRule.StartKeyHex) + if err != nil { + return err + } + newRule.EndKey, err = keyHexStrToRawKey(newRule.EndKeyHex) + if err != nil { + return err + } + *r = newRule + return nil +} + // RuleOpType indicates the operation type type RuleOpType string @@ -364,6 +444,94 @@ func (r RuleOp) String() string { return string(b) } +var ( + _ json.Marshaler = (*RuleOp)(nil) + _ json.Unmarshaler = (*RuleOp)(nil) +) + +// This is a helper struct used to customizing the JSON marshal/unmarshal methods of `RuleOp`. +type ruleOp struct { + GroupID string `json:"group_id"` + ID string `json:"id"` + Index int `json:"index,omitempty"` + Override bool `json:"override,omitempty"` + StartKeyHex string `json:"start_key"` + EndKeyHex string `json:"end_key"` + Role PeerRoleType `json:"role"` + IsWitness bool `json:"is_witness"` + Count int `json:"count"` + LabelConstraints []LabelConstraint `json:"label_constraints,omitempty"` + LocationLabels []string `json:"location_labels,omitempty"` + IsolationLevel string `json:"isolation_level,omitempty"` + Action RuleOpType `json:"action"` + DeleteByIDPrefix bool `json:"delete_by_id_prefix"` +} + +// MarshalJSON implements `json.Marshaler` interface to make sure we could set the correct start/end key. +func (r *RuleOp) MarshalJSON() ([]byte, error) { + tempRuleOp := &ruleOp{ + GroupID: r.GroupID, + ID: r.ID, + Index: r.Index, + Override: r.Override, + StartKeyHex: r.StartKeyHex, + EndKeyHex: r.EndKeyHex, + Role: r.Role, + IsWitness: r.IsWitness, + Count: r.Count, + LabelConstraints: r.LabelConstraints, + LocationLabels: r.LocationLabels, + IsolationLevel: r.IsolationLevel, + Action: r.Action, + DeleteByIDPrefix: r.DeleteByIDPrefix, + } + // Converts the start/end key to hex format if the corresponding hex field is empty. + if len(r.StartKey) > 0 && len(r.StartKeyHex) == 0 { + tempRuleOp.StartKeyHex = rawKeyToKeyHexStr(r.StartKey) + } + if len(r.EndKey) > 0 && len(r.EndKeyHex) == 0 { + tempRuleOp.EndKeyHex = rawKeyToKeyHexStr(r.EndKey) + } + return json.Marshal(tempRuleOp) +} + +// UnmarshalJSON implements `json.Unmarshaler` interface to make sure we could get the correct start/end key. +func (r *RuleOp) UnmarshalJSON(bytes []byte) error { + var tempRuleOp ruleOp + err := json.Unmarshal(bytes, &tempRuleOp) + if err != nil { + return err + } + newRuleOp := RuleOp{ + Rule: &Rule{ + GroupID: tempRuleOp.GroupID, + ID: tempRuleOp.ID, + Index: tempRuleOp.Index, + Override: tempRuleOp.Override, + StartKeyHex: tempRuleOp.StartKeyHex, + EndKeyHex: tempRuleOp.EndKeyHex, + Role: tempRuleOp.Role, + IsWitness: tempRuleOp.IsWitness, + Count: tempRuleOp.Count, + LabelConstraints: tempRuleOp.LabelConstraints, + LocationLabels: tempRuleOp.LocationLabels, + IsolationLevel: tempRuleOp.IsolationLevel, + }, + Action: tempRuleOp.Action, + DeleteByIDPrefix: tempRuleOp.DeleteByIDPrefix, + } + newRuleOp.StartKey, err = keyHexStrToRawKey(newRuleOp.StartKeyHex) + if err != nil { + return err + } + newRuleOp.EndKey, err = keyHexStrToRawKey(newRuleOp.EndKeyHex) + if err != nil { + return err + } + *r = newRuleOp + return nil +} + // RuleGroup defines properties of a rule group. type RuleGroup struct { ID string `json:"id,omitempty"` diff --git a/client/http/types_test.go b/client/http/types_test.go index 0dfebacbdcf..74482e29c3c 100644 --- a/client/http/types_test.go +++ b/client/http/types_test.go @@ -15,6 +15,7 @@ package http import ( + "encoding/json" "testing" "github.com/stretchr/testify/require" @@ -47,3 +48,153 @@ func TestMergeRegionsInfo(t *testing.T) { re.Equal(2, len(regionsInfo.Regions)) re.Equal(append(regionsInfo1.Regions, regionsInfo2.Regions...), regionsInfo.Regions) } + +func TestRuleStartEndKey(t *testing.T) { + re := require.New(t) + // Empty start/end key and key hex. + ruleToMarshal := &Rule{} + rule := mustMarshalAndUnmarshal(re, ruleToMarshal) + re.Equal("", rule.StartKeyHex) + re.Equal("", rule.EndKeyHex) + re.Equal([]byte(""), rule.StartKey) + re.Equal([]byte(""), rule.EndKey) + // Empty start/end key and non-empty key hex. + ruleToMarshal = &Rule{ + StartKeyHex: rawKeyToKeyHexStr([]byte("a")), + EndKeyHex: rawKeyToKeyHexStr([]byte("b")), + } + rule = mustMarshalAndUnmarshal(re, ruleToMarshal) + re.Equal([]byte("a"), rule.StartKey) + re.Equal([]byte("b"), rule.EndKey) + re.Equal(ruleToMarshal.StartKeyHex, rule.StartKeyHex) + re.Equal(ruleToMarshal.EndKeyHex, rule.EndKeyHex) + // Non-empty start/end key and empty key hex. + ruleToMarshal = &Rule{ + StartKey: []byte("a"), + EndKey: []byte("b"), + } + rule = mustMarshalAndUnmarshal(re, ruleToMarshal) + re.Equal(ruleToMarshal.StartKey, rule.StartKey) + re.Equal(ruleToMarshal.EndKey, rule.EndKey) + re.Equal(rawKeyToKeyHexStr(ruleToMarshal.StartKey), rule.StartKeyHex) + re.Equal(rawKeyToKeyHexStr(ruleToMarshal.EndKey), rule.EndKeyHex) + // Non-empty start/end key and non-empty key hex. + ruleToMarshal = &Rule{ + StartKey: []byte("a"), + EndKey: []byte("b"), + StartKeyHex: rawKeyToKeyHexStr([]byte("c")), + EndKeyHex: rawKeyToKeyHexStr([]byte("d")), + } + rule = mustMarshalAndUnmarshal(re, ruleToMarshal) + re.Equal([]byte("c"), rule.StartKey) + re.Equal([]byte("d"), rule.EndKey) + re.Equal(ruleToMarshal.StartKeyHex, rule.StartKeyHex) + re.Equal(ruleToMarshal.EndKeyHex, rule.EndKeyHex) + // Half of each pair of keys is empty. + ruleToMarshal = &Rule{ + StartKey: []byte("a"), + EndKeyHex: rawKeyToKeyHexStr([]byte("d")), + } + rule = mustMarshalAndUnmarshal(re, ruleToMarshal) + re.Equal(ruleToMarshal.StartKey, rule.StartKey) + re.Equal([]byte("d"), rule.EndKey) + re.Equal(rawKeyToKeyHexStr(ruleToMarshal.StartKey), rule.StartKeyHex) + re.Equal(ruleToMarshal.EndKeyHex, rule.EndKeyHex) +} + +func mustMarshalAndUnmarshal(re *require.Assertions, rule *Rule) *Rule { + ruleJSON, err := json.Marshal(rule) + re.NoError(err) + var newRule *Rule + err = json.Unmarshal(ruleJSON, &newRule) + re.NoError(err) + return newRule +} + +func TestRuleOpStartEndKey(t *testing.T) { + re := require.New(t) + // Empty start/end key and key hex. + ruleOpToMarshal := &RuleOp{ + Rule: &Rule{}, + } + ruleOp := mustMarshalAndUnmarshalRuleOp(re, ruleOpToMarshal) + re.Equal("", ruleOp.StartKeyHex) + re.Equal("", ruleOp.EndKeyHex) + re.Equal([]byte(""), ruleOp.StartKey) + re.Equal([]byte(""), ruleOp.EndKey) + // Empty start/end key and non-empty key hex. + ruleOpToMarshal = &RuleOp{ + Rule: &Rule{ + StartKeyHex: rawKeyToKeyHexStr([]byte("a")), + EndKeyHex: rawKeyToKeyHexStr([]byte("b")), + }, + Action: RuleOpAdd, + DeleteByIDPrefix: true, + } + ruleOp = mustMarshalAndUnmarshalRuleOp(re, ruleOpToMarshal) + re.Equal([]byte("a"), ruleOp.StartKey) + re.Equal([]byte("b"), ruleOp.EndKey) + re.Equal(ruleOpToMarshal.StartKeyHex, ruleOp.StartKeyHex) + re.Equal(ruleOpToMarshal.EndKeyHex, ruleOp.EndKeyHex) + re.Equal(ruleOpToMarshal.Action, ruleOp.Action) + re.Equal(ruleOpToMarshal.DeleteByIDPrefix, ruleOp.DeleteByIDPrefix) + // Non-empty start/end key and empty key hex. + ruleOpToMarshal = &RuleOp{ + Rule: &Rule{ + StartKey: []byte("a"), + EndKey: []byte("b"), + }, + Action: RuleOpAdd, + DeleteByIDPrefix: true, + } + ruleOp = mustMarshalAndUnmarshalRuleOp(re, ruleOpToMarshal) + re.Equal(ruleOpToMarshal.StartKey, ruleOp.StartKey) + re.Equal(ruleOpToMarshal.EndKey, ruleOp.EndKey) + re.Equal(rawKeyToKeyHexStr(ruleOpToMarshal.StartKey), ruleOp.StartKeyHex) + re.Equal(rawKeyToKeyHexStr(ruleOpToMarshal.EndKey), ruleOp.EndKeyHex) + re.Equal(ruleOpToMarshal.Action, ruleOp.Action) + re.Equal(ruleOpToMarshal.DeleteByIDPrefix, ruleOp.DeleteByIDPrefix) + // Non-empty start/end key and non-empty key hex. + ruleOpToMarshal = &RuleOp{ + Rule: &Rule{ + StartKey: []byte("a"), + EndKey: []byte("b"), + StartKeyHex: rawKeyToKeyHexStr([]byte("c")), + EndKeyHex: rawKeyToKeyHexStr([]byte("d")), + }, + Action: RuleOpAdd, + DeleteByIDPrefix: true, + } + ruleOp = mustMarshalAndUnmarshalRuleOp(re, ruleOpToMarshal) + re.Equal([]byte("c"), ruleOp.StartKey) + re.Equal([]byte("d"), ruleOp.EndKey) + re.Equal(ruleOpToMarshal.StartKeyHex, ruleOp.StartKeyHex) + re.Equal(ruleOpToMarshal.EndKeyHex, ruleOp.EndKeyHex) + re.Equal(ruleOpToMarshal.Action, ruleOp.Action) + re.Equal(ruleOpToMarshal.DeleteByIDPrefix, ruleOp.DeleteByIDPrefix) + // Half of each pair of keys is empty. + ruleOpToMarshal = &RuleOp{ + Rule: &Rule{ + StartKey: []byte("a"), + EndKeyHex: rawKeyToKeyHexStr([]byte("d")), + }, + Action: RuleOpDel, + DeleteByIDPrefix: false, + } + ruleOp = mustMarshalAndUnmarshalRuleOp(re, ruleOpToMarshal) + re.Equal(ruleOpToMarshal.StartKey, ruleOp.StartKey) + re.Equal([]byte("d"), ruleOp.EndKey) + re.Equal(rawKeyToKeyHexStr(ruleOpToMarshal.StartKey), ruleOp.StartKeyHex) + re.Equal(ruleOpToMarshal.EndKeyHex, ruleOp.EndKeyHex) + re.Equal(ruleOpToMarshal.Action, ruleOp.Action) + re.Equal(ruleOpToMarshal.DeleteByIDPrefix, ruleOp.DeleteByIDPrefix) +} + +func mustMarshalAndUnmarshalRuleOp(re *require.Assertions, ruleOp *RuleOp) *RuleOp { + ruleOpJSON, err := json.Marshal(ruleOp) + re.NoError(err) + var newRuleOp *RuleOp + err = json.Unmarshal(ruleOpJSON, &newRuleOp) + re.NoError(err) + return newRuleOp +} diff --git a/server/api/stats.go b/server/api/stats.go index 1798597b6cc..915d33ddfdf 100644 --- a/server/api/stats.go +++ b/server/api/stats.go @@ -36,8 +36,9 @@ func newStatsHandler(svr *server.Server, rd *render.Render) *statsHandler { // @Tags stats // @Summary Get region statistics of a specified range. -// @Param start_key query string true "Start key" -// @Param end_key query string true "End key" +// @Param start_key query string true "Start key" +// @Param end_key query string true "End key" +// @Param count query bool false "Whether only count the number of regions" // @Produce json // @Success 200 {object} statistics.RegionStats // @Router /stats/region [get] diff --git a/server/api/store.go b/server/api/store.go index a44850d35cc..8537cd45c5b 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -172,14 +172,14 @@ func newStoreHandler(handler *server.Handler, rd *render.Render) *storeHandler { } } -// @Tags store +// @Tags store // @Summary Get a store's information. // @Param id path integer true "Store Id" -// @Produce json +// @Produce json // @Success 200 {object} StoreInfo // @Failure 400 {string} string "The input is invalid." // @Failure 404 {string} string "The store does not exist." -// @Failure 500 {string} string "PD server failed to proceed the request." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /store/{id} [get] func (h *storeHandler) GetStore(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) @@ -735,13 +735,13 @@ func (h *storesHandler) GetStoresProgress(w http.ResponseWriter, r *http.Request } // @Tags store -// @Summary Get all stores in the cluster. -// @Param state query array true "Specify accepted store states." +// @Summary Get all stores in the cluster. +// @Param state query array true "Specify accepted store states." // @Produce json -// @Success 200 {object} StoresInfo +// @Success 200 {object} StoresInfo // @Failure 500 {string} string "PD server failed to proceed the request." -// @Router /stores [get] -// @Deprecated Better to use /stores/check instead. +// @Router /stores [get] +// @Deprecated Better to use /stores/check instead. func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) { rc := getCluster(r) stores := rc.GetMetaStores() diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index a007b893187..6c636d2a2a1 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -105,9 +105,17 @@ func (suite *httpClientTestSuite) TestMeta() { re.NoError(err) re.Equal(int64(2), regions.Count) re.Len(regions.Regions, 2) - regionStats, err := suite.client.GetRegionStatusByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3"))) + state, err := suite.client.GetRegionsReplicatedStateByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3"))) re.NoError(err) - re.Equal(2, regionStats.Count) + re.Equal("INPROGRESS", state) + regionStats, err := suite.client.GetRegionStatusByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), false) + re.NoError(err) + re.Greater(regionStats.Count, 0) + re.NotEmpty(regionStats.StoreLeaderCount) + regionStats, err = suite.client.GetRegionStatusByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), true) + re.NoError(err) + re.Greater(regionStats.Count, 0) + re.Empty(regionStats.StoreLeaderCount) hotReadRegions, err := suite.client.GetHotReadRegions(suite.ctx) re.NoError(err) re.Len(hotReadRegions.AsPeer, 1) @@ -170,18 +178,22 @@ func (suite *httpClientTestSuite) TestRule() { re.Equal(bundles[0], bundle) // Check if we have the default rule. suite.checkRule(re, &pd.Rule{ - GroupID: placement.DefaultGroupID, - ID: placement.DefaultRuleID, - Role: pd.Voter, - Count: 3, + GroupID: placement.DefaultGroupID, + ID: placement.DefaultRuleID, + Role: pd.Voter, + Count: 3, + StartKey: []byte{}, + EndKey: []byte{}, }, 1, true) // Should be the same as the rules in the bundle. suite.checkRule(re, bundle.Rules[0], 1, true) testRule := &pd.Rule{ - GroupID: placement.DefaultGroupID, - ID: "test", - Role: pd.Voter, - Count: 3, + GroupID: placement.DefaultGroupID, + ID: "test", + Role: pd.Voter, + Count: 3, + StartKey: []byte{}, + EndKey: []byte{}, } err = suite.client.SetPlacementRule(suite.ctx, testRule) re.NoError(err) @@ -233,6 +245,18 @@ func (suite *httpClientTestSuite) TestRule() { ruleGroup, err = suite.client.GetPlacementRuleGroupByID(suite.ctx, testRuleGroup.ID) re.ErrorContains(err, http.StatusText(http.StatusNotFound)) re.Empty(ruleGroup) + // Test the start key and end key. + testRule = &pd.Rule{ + GroupID: placement.DefaultGroupID, + ID: "test", + Role: pd.Voter, + Count: 5, + StartKey: []byte("a1"), + EndKey: []byte(""), + } + err = suite.client.SetPlacementRule(suite.ctx, testRule) + re.NoError(err) + suite.checkRule(re, testRule, 1, true) } func (suite *httpClientTestSuite) checkRule( @@ -262,6 +286,8 @@ func checkRuleFunc( re.Equal(rule.ID, r.ID) re.Equal(rule.Role, r.Role) re.Equal(rule.Count, r.Count) + re.Equal(rule.StartKey, r.StartKey) + re.Equal(rule.EndKey, r.EndKey) return } if exist {