Skip to content

Commit

Permalink
Merge branch 'master' into close-conn
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jan 31, 2024
2 parents 313ec06 + 357ad3f commit 05ef674
Show file tree
Hide file tree
Showing 32 changed files with 1,307 additions and 218 deletions.
12 changes: 7 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ ifneq ($(DASHBOARD_DISTRIBUTION_DIR),)
endif
PD_SERVER_DEP += dashboard-ui

pd-server: ${PD_SERVER_DEP}
pre-build: ${PD_SERVER_DEP}

pd-server: pre-build
GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_CGO_ENABLED) go build $(BUILD_FLAGS) -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -tags "$(BUILD_TAGS)" -o $(BUILD_BIN_PATH)/pd-server cmd/pd-server/main.go

pd-server-failpoint:
Expand All @@ -103,7 +105,7 @@ pd-server-failpoint:
pd-server-basic:
SWAGGER=0 DASHBOARD=0 $(MAKE) pd-server

.PHONY: build tools pd-server pd-server-basic
.PHONY: pre-build build tools pd-server pd-server-basic

# Tools

Expand Down Expand Up @@ -172,9 +174,9 @@ install-tools:

#### Static checks ####

check: install-tools tidy static generate-errdoc
check: tidy static generate-errdoc

static: install-tools
static: install-tools pre-build
@ echo "gofmt ..."
@ gofmt -s -l -d $(PACKAGE_DIRECTORIES) 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
Expand Down Expand Up @@ -240,7 +242,7 @@ basic-test: install-tools

ci-test-job: install-tools dashboard-ui
@$(FAILPOINT_ENABLE)
./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX)
./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

TSO_INTEGRATION_TEST_PKGS := $(PD_PKG)/tests/server/tso
Expand Down
17 changes: 2 additions & 15 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func createClientWithKeyspace(
ctx: clientCtx,
cancel: clientCancel,
keyspaceID: keyspaceID,
svrUrls: addrsToUrls(svrAddrs),
svrUrls: svrAddrs,
tlsCfg: tlsCfg,
option: newOption(),
}
Expand Down Expand Up @@ -500,7 +500,7 @@ func newClientWithKeyspaceName(
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
cancel: clientCancel,
svrUrls: addrsToUrls(svrAddrs),
svrUrls: svrAddrs,
tlsCfg: tlsCfg,
option: newOption(),
}
Expand Down Expand Up @@ -1388,19 +1388,6 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
return resp, nil
}

func addrsToUrls(addrs []string) []string {
// Add default schema "http://" to addrs.
urls := make([]string, 0, len(addrs))
for _, addr := range addrs {
if strings.Contains(addr, "://") {
urls = append(urls, addr)
} else {
urls = append(urls, "http://"+addr)
}
}
return urls
}

// IsLeaderChange will determine whether there is a leader change.
func IsLeaderChange(err error) bool {
if err == errs.ErrClientTSOStreamClosed {
Expand Down
27 changes: 15 additions & 12 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,18 +480,7 @@ func NewDefaultPDServiceDiscovery(
urls []string, tlsCfg *tls.Config,
) *pdServiceDiscovery {
var wg sync.WaitGroup
pdsd := &pdServiceDiscovery{
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
wg: &wg,
apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn), newPDServiceBalancer(regionAPIErrorFn)},
keyspaceID: defaultKeyspaceID,
tlsCfg: tlsCfg,
option: newOption(),
}
pdsd.urls.Store(urls)
return pdsd
return newPDServiceDiscovery(ctx, cancel, &wg, nil, nil, defaultKeyspaceID, urls, tlsCfg, newOption())
}

// newPDServiceDiscovery returns a new PD service discovery-based client.
Expand All @@ -515,6 +504,7 @@ func newPDServiceDiscovery(
tlsCfg: tlsCfg,
option: option,
}
urls = addrsToUrls(urls)
pdsd.urls.Store(urls)
return pdsd
}
Expand Down Expand Up @@ -1155,3 +1145,16 @@ func (c *pdServiceDiscovery) switchTSOAllocatorLeaders(allocatorMap map[string]*
func (c *pdServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...)
}

func addrsToUrls(addrs []string) []string {
// Add default schema "http://" to addrs.
urls := make([]string, 0, len(addrs))
for _, addr := range addrs {
if strings.Contains(addr, "://") {
urls = append(urls, addr)
} else {
urls = append(urls, "http://"+addr)
}
}
return urls
}
188 changes: 185 additions & 3 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -10066,6 +10066,188 @@
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The time consumed of etcd endpoint health check in .99",
"editable": true,
"error": false,
"fill": 1,
"fillGradient": 0,
"grid": {},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 53
},
"hiddenSeries": false,
"id": 1607,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"sort": "current",
"sortDesc": false,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"paceLength": 10,
"percentage": false,
"pluginVersion": "8.5.27",
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum(rate(pd_server_etcd_endpoint_latency_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=~\"$tidb_cluster.*\", source=\"server-etcd-client\"}[30s])) by (instance, endpoint, le))",
"intervalFactor": 2,
"legendFormat": "{{instance}} -> {{endpoint}}",
"range": true,
"refId": "A",
"step": 4
}
],
"thresholds": [],
"timeRegions": [],
"title": "99% Endpoint health check latency",
"tooltip": {
"msResolution": false,
"shared": true,
"sort": 0,
"value_type": "cumulative"
},
"type": "graph",
"xaxis": {
"mode": "time",
"show": true,
"values": []
},
"yaxes": [
{
"format": "s",
"logBase": 1,
"show": true
},
{
"format": "short",
"logBase": 1,
"show": true
}
],
"yaxis": {
"align": false
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The state of the endpoint health.",
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 61
},
"hiddenSeries": false,
"id": 1110,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"paceLength": 10,
"percentage": false,
"pluginVersion": "8.5.27",
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"editorMode": "code",
"expr": "pd_server_etcd_client{k8s_cluster=\"$k8s_cluster\", tidb_cluster=~\"$tidb_cluster.*\", source=\"server-etcd-client\"}",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{instance}} - {{type}}",
"range": true,
"refId": "A"
}
],
"thresholds": [],
"timeRegions": [],
"title": "Endpoint health state",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"mode": "time",
"show": true,
"values": []
},
"yaxes": [
{
"format": "none",
"logBase": 1,
"show": true
},
{
"format": "short",
"logBase": 1,
"show": true
}
],
"yaxis": {
"align": false
}
},
{
"aliasColors": {},
"bars": false,
Expand All @@ -10078,7 +10260,7 @@
"h": 8,
"w": 8,
"x": 0,
"y": 53
"y": 69
},
"id": 1109,
"legend": {
Expand Down Expand Up @@ -10169,7 +10351,7 @@
"h": 8,
"w": 8,
"x": 8,
"y": 53
"y": 69
},
"id": 1110,
"legend": {
Expand Down Expand Up @@ -10261,7 +10443,7 @@
"h": 8,
"w": 8,
"x": 16,
"y": 53
"y": 69
},
"id": 1111,
"legend": {
Expand Down
7 changes: 6 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -454,7 +455,11 @@ func (c *Cluster) runCoordinator() {
defer logutil.LogPanic()
defer c.wg.Done()
// force wait for 1 minute to make prepare checker won't be directly skipped
c.coordinator.RunUntilStop(collectWaitTime)
runCollectWaitTime := collectWaitTime
failpoint.Inject("changeRunCollectWaitTime", func() {
runCollectWaitTime = 1 * time.Second
})
c.coordinator.RunUntilStop(runCollectWaitTime)
}

func (c *Cluster) runMetricsCollectionJob() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func InitClient(s server) error {
if err != nil {
return err
}
etcdClient, err := etcdutil.CreateEtcdClient(tlsConfig, backendUrls)
etcdClient, err := etcdutil.CreateEtcdClient(tlsConfig, backendUrls, "mcs-etcd-client")
if err != nil {
return err
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,13 +553,11 @@ type SchedulerConfig struct {
var DefaultSchedulers = SchedulerConfigs{
{Type: "balance-region"},
{Type: "balance-leader"},
{Type: "balance-witness"},
{Type: "hot-region"},
{Type: "transfer-witness-leader"},
{Type: "evict-slow-store"},
}

// IsDefaultScheduler checks whether the scheduler is enable by default.
// IsDefaultScheduler checks whether the scheduler is enabled by default.
func IsDefaultScheduler(typ string) bool {
for _, c := range DefaultSchedulers {
if typ == c.Type {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Controller struct {
ctx context.Context
cluster sche.SchedulerCluster
storage endpoint.ConfigStorage
// schedulers is used to manage all schedulers, which will only be initialized
// schedulers are used to manage all schedulers, which will only be initialized
// and used in the PD leader service mode now.
schedulers map[string]*ScheduleController
// schedulerHandlers is used to manage the HTTP handlers of schedulers,
Expand Down
Loading

0 comments on commit 05ef674

Please sign in to comment.