From 08e897823fbe9f1d7244b2468d1689102638d773 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 31 May 2023 18:23:56 +0800 Subject: [PATCH] add scheduling microservice framework Signed-off-by: Ryan Leung --- pkg/mcs/resourcemanager/server/server.go | 2 +- pkg/mcs/scheduling/server/apis/v1/api.go | 92 ++++ pkg/mcs/scheduling/server/config.go | 481 +++++++++++++++++++ pkg/mcs/scheduling/server/config_test.go | 42 ++ pkg/mcs/scheduling/server/grpc_service.go | 71 +++ pkg/mcs/scheduling/server/metrics.go | 37 ++ pkg/mcs/scheduling/server/server.go | 552 ++++++++++++++++++++++ pkg/mcs/utils/constant.go | 2 + pkg/schedule/config/config.go | 1 + server/cluster/cluster.go | 1 + 10 files changed, 1280 insertions(+), 1 deletion(-) create mode 100644 pkg/mcs/scheduling/server/apis/v1/api.go create mode 100644 pkg/mcs/scheduling/server/config.go create mode 100644 pkg/mcs/scheduling/server/config_test.go create mode 100644 pkg/mcs/scheduling/server/grpc_service.go create mode 100644 pkg/mcs/scheduling/server/metrics.go create mode 100644 pkg/mcs/scheduling/server/server.go diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 6705c4b1da9e..5cfbf22c6884 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -456,7 +456,7 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) { // Flushing any buffered log entries defer log.Sync() - versioninfo.Log("resource manager") + versioninfo.Log("Resource Manager") log.Info("resource manager config", zap.Reflect("config", cfg)) grpcprometheus.EnableHandlingTimeHistogram() diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go new file mode 100644 index 000000000000..108a44a3d72f --- /dev/null +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -0,0 +1,92 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apis + +import ( + "net/http" + "sync" + + "github.com/gin-contrib/cors" + "github.com/gin-contrib/gzip" + "github.com/gin-gonic/gin" + "github.com/joho/godotenv" + scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/unrolled/render" +) + +// APIPathPrefix is the prefix of the API path. +const APIPathPrefix = "/scheduling/api/v1/" + +var ( + once sync.Once + apiServiceGroup = apiutil.APIServiceGroup{ + Name: "scheduling", + Version: "v1", + IsCore: false, + PathPrefix: APIPathPrefix, + } +) + +func init() { + scheserver.SetUpRestHandler = func(srv *scheserver.Service) (http.Handler, apiutil.APIServiceGroup) { + s := NewService(srv) + return s.apiHandlerEngine, apiServiceGroup + } +} + +// Service is the tso service. +type Service struct { + apiHandlerEngine *gin.Engine + root *gin.RouterGroup + + srv *scheserver.Service + rd *render.Render +} + +func createIndentRender() *render.Render { + return render.New(render.Options{ + IndentJSON: true, + }) +} + +// NewService returns a new Service. +func NewService(srv *scheserver.Service) *Service { + once.Do(func() { + // These global modification will be effective only for the first invoke. + _ = godotenv.Load() + gin.SetMode(gin.ReleaseMode) + }) + apiHandlerEngine := gin.New() + apiHandlerEngine.Use(gin.Recovery()) + apiHandlerEngine.Use(cors.Default()) + apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression)) + apiHandlerEngine.Use(func(c *gin.Context) { + c.Set(multiservicesapi.ServiceContextKey, srv) + c.Next() + }) + apiHandlerEngine.Use(multiservicesapi.ServiceRedirector()) + apiHandlerEngine.GET("metrics", utils.PromHandler()) + root := apiHandlerEngine.Group(APIPathPrefix) + s := &Service{ + srv: srv, + apiHandlerEngine: apiHandlerEngine, + root: root, + rd: createIndentRender(), + } + return s +} diff --git a/pkg/mcs/scheduling/server/config.go b/pkg/mcs/scheduling/server/config.go new file mode 100644 index 000000000000..0564c5b17c0e --- /dev/null +++ b/pkg/mcs/scheduling/server/config.go @@ -0,0 +1,481 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/BurntSushi/toml" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/spf13/pflag" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/utils/configutil" + "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/metricutil" + "github.com/tikv/pd/pkg/utils/typeutil" + "go.uber.org/zap" +) + +const ( + defaultName = "Scheduling" + defaultBackendEndpoints = "http://127.0.0.1:2379" + defaultListenAddr = "http://127.0.0.1:3379" +) + +// SchedulerConfigs is a slice of customized scheduler configuration. +type SchedulerConfigs []SchedulerConfig + +// SchedulerConfig is customized scheduler configuration +type SchedulerConfig struct { + Type string `toml:"type" json:"type"` + Args []string `toml:"args" json:"args"` + Disable bool `toml:"disable" json:"disable"` + ArgsPayload string `toml:"args-payload" json:"args-payload"` +} + +// DefaultSchedulers are the schedulers be created by default. +// If these schedulers are not in the persistent configuration, they +// will be created automatically when reloading. +var DefaultSchedulers = SchedulerConfigs{ + {Type: "balance-region"}, + {Type: "balance-leader"}, + {Type: "hot-region"}, +} + +const ( + defaultMaxReplicas = 3 + defaultMaxSnapshotCount = 64 + defaultMaxPendingPeerCount = 64 + defaultMaxMergeRegionSize = 20 + defaultSplitMergeInterval = time.Hour + defaultSwitchWitnessInterval = time.Hour + defaultEnableDiagnostic = true + defaultPatrolRegionInterval = 10 * time.Millisecond + defaultMaxStoreDownTime = 30 * time.Minute + defaultLeaderScheduleLimit = 4 + defaultRegionScheduleLimit = 2048 + defaultWitnessScheduleLimit = 4 + defaultReplicaScheduleLimit = 64 + defaultMergeScheduleLimit = 8 + defaultHotRegionScheduleLimit = 4 + defaultTolerantSizeRatio = 0 + defaultLowSpaceRatio = 0.8 + defaultHighSpaceRatio = 0.7 + defaultRegionScoreFormulaVersion = "v2" + // defaultHotRegionCacheHitsThreshold is the low hit number threshold of the + // hot region. + defaultHotRegionCacheHitsThreshold = 3 + defaultSchedulerMaxWaitingOperator = 5 + defaultLeaderSchedulePolicy = "count" + defaultStoreLimitMode = "manual" + defaultEnableJointConsensus = true + defaultEnableTiKVSplitRegion = true + defaultEnableCrossTableMerge = true + defaultHotRegionsWriteInterval = 10 * time.Minute + defaultHotRegionsReservedDays = 7 + // It means we skip the preparing stage after the 48 hours no matter if the store has finished preparing stage. + defaultMaxStorePreparingTime = 48 * time.Hour + // When a slow store affected more than 30% of total stores, it will trigger evicting. + defaultSlowStoreEvictingAffectedStoreRatioThreshold = 0.3 + + defaultStoreLimitVersion = "v1" + defaultEnableWitness = false + defaultHaltScheduling = false +) + +// Config is the configuration for the resource manager. +type Config struct { + BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"` + ListenAddr string `toml:"listen-addr" json:"listen-addr"` + AdvertiseListenAddr string `toml:"advertise-listen-addr" json:"advertise-listen-addr"` + Name string `toml:"name" json:"name"` + DataDir string `toml:"data-dir" json:"data-dir"` // TODO: remove this after refactoring + EnableGRPCGateway bool `json:"enable-grpc-gateway"` // TODO: use it + + Metric metricutil.MetricConfig `toml:"metric" json:"metric"` + + // Log related config. + Log log.Config `toml:"log" json:"log"` + Logger *zap.Logger + LogProps *log.ZapProperties + + Security configutil.SecurityConfig `toml:"security" json:"security"` + + // LeaderLease defines the time within which a Resource Manager primary/leader must + // update its TTL in etcd, otherwise etcd will expire the leader key and other servers + // can campaign the primary/leader again. Etcd only supports seconds TTL, so here is + // second too. + LeaderLease int64 `toml:"lease" json:"lease"` + + // If the snapshot count of one store is greater than this value, + // it will never be used as a source or target store. + MaxSnapshotCount uint64 `toml:"max-snapshot-count" json:"max-snapshot-count"` + MaxPendingPeerCount uint64 `toml:"max-pending-peer-count" json:"max-pending-peer-count"` + // If both the size of region is smaller than MaxMergeRegionSize + // and the number of rows in region is smaller than MaxMergeRegionKeys, + // it will try to merge with adjacent regions. + MaxMergeRegionSize uint64 `toml:"max-merge-region-size" json:"max-merge-region-size"` + MaxMergeRegionKeys uint64 `toml:"max-merge-region-keys" json:"max-merge-region-keys"` + // SplitMergeInterval is the minimum interval time to permit merge after split. + SplitMergeInterval typeutil.Duration `toml:"split-merge-interval" json:"split-merge-interval"` + // SwitchWitnessInterval is the minimum interval that allows a peer to become a witness again after it is promoted to non-witness. + SwitchWitnessInterval typeutil.Duration `toml:"switch-witness-interval" json:"swtich-witness-interval"` + // EnableOneWayMerge is the option to enable one way merge. This means a Region can only be merged into the next region of it. + EnableOneWayMerge bool `toml:"enable-one-way-merge" json:"enable-one-way-merge,string"` + // EnableCrossTableMerge is the option to enable cross table merge. This means two Regions can be merged with different table IDs. + // This option only works when key type is "table". + EnableCrossTableMerge bool `toml:"enable-cross-table-merge" json:"enable-cross-table-merge,string"` + // PatrolRegionInterval is the interval for scanning region during patrol. + PatrolRegionInterval typeutil.Duration `toml:"patrol-region-interval" json:"patrol-region-interval"` + // MaxStoreDownTime is the max duration after which + // a store will be considered to be down if it hasn't reported heartbeats. + MaxStoreDownTime typeutil.Duration `toml:"max-store-down-time" json:"max-store-down-time"` + // MaxStorePreparingTime is the max duration after which + // a store will be considered to be preparing. + MaxStorePreparingTime typeutil.Duration `toml:"max-store-preparing-time" json:"max-store-preparing-time"` + // LeaderScheduleLimit is the max coexist leader schedules. + LeaderScheduleLimit uint64 `toml:"leader-schedule-limit" json:"leader-schedule-limit"` + // LeaderSchedulePolicy is the option to balance leader, there are some policies supported: ["count", "size"], default: "count" + LeaderSchedulePolicy string `toml:"leader-schedule-policy" json:"leader-schedule-policy"` + // RegionScheduleLimit is the max coexist region schedules. + RegionScheduleLimit uint64 `toml:"region-schedule-limit" json:"region-schedule-limit"` + // WitnessScheduleLimit is the max coexist witness schedules. + WitnessScheduleLimit uint64 `toml:"witness-schedule-limit" json:"witness-schedule-limit"` + // ReplicaScheduleLimit is the max coexist replica schedules. + ReplicaScheduleLimit uint64 `toml:"replica-schedule-limit" json:"replica-schedule-limit"` + // MergeScheduleLimit is the max coexist merge schedules. + MergeScheduleLimit uint64 `toml:"merge-schedule-limit" json:"merge-schedule-limit"` + // HotRegionScheduleLimit is the max coexist hot region schedules. + HotRegionScheduleLimit uint64 `toml:"hot-region-schedule-limit" json:"hot-region-schedule-limit"` + // HotRegionCacheHitThreshold is the cache hits threshold of the hot region. + // If the number of times a region hits the hot cache is greater than this + // threshold, it is considered a hot region. + HotRegionCacheHitsThreshold uint64 `toml:"hot-region-cache-hits-threshold" json:"hot-region-cache-hits-threshold"` + + // StoreLimit is the limit of scheduling for stores. + StoreLimit map[uint64]StoreLimitConfig `toml:"store-limit" json:"store-limit"` + // TolerantSizeRatio is the ratio of buffer size for balance scheduler. + TolerantSizeRatio float64 `toml:"tolerant-size-ratio" json:"tolerant-size-ratio"` + // + // high space stage transition stage low space stage + // |--------------------|-----------------------------|-------------------------| + // ^ ^ ^ ^ + // 0 HighSpaceRatio * capacity LowSpaceRatio * capacity capacity + // + // LowSpaceRatio is the lowest usage ratio of store which regraded as low space. + // When in low space, store region score increases to very large and varies inversely with available size. + LowSpaceRatio float64 `toml:"low-space-ratio" json:"low-space-ratio"` + // HighSpaceRatio is the highest usage ratio of store which regraded as high space. + // High space means there is a lot of spare capacity, and store region score varies directly with used size. + HighSpaceRatio float64 `toml:"high-space-ratio" json:"high-space-ratio"` + // RegionScoreFormulaVersion is used to control the formula used to calculate region score. + RegionScoreFormulaVersion string `toml:"region-score-formula-version" json:"region-score-formula-version"` + // SchedulerMaxWaitingOperator is the max coexist operators for each scheduler. + SchedulerMaxWaitingOperator uint64 `toml:"scheduler-max-waiting-operator" json:"scheduler-max-waiting-operator"` + + // EnableRemoveDownReplica is the option to enable replica checker to remove down replica. + EnableRemoveDownReplica bool `toml:"enable-remove-down-replica" json:"enable-remove-down-replica,string"` + // EnableReplaceOfflineReplica is the option to enable replica checker to replace offline replica. + EnableReplaceOfflineReplica bool `toml:"enable-replace-offline-replica" json:"enable-replace-offline-replica,string"` + // EnableMakeUpReplica is the option to enable replica checker to make up replica. + EnableMakeUpReplica bool `toml:"enable-make-up-replica" json:"enable-make-up-replica,string"` + // EnableRemoveExtraReplica is the option to enable replica checker to remove extra replica. + EnableRemoveExtraReplica bool `toml:"enable-remove-extra-replica" json:"enable-remove-extra-replica,string"` + // EnableLocationReplacement is the option to enable replica checker to move replica to a better location. + EnableLocationReplacement bool `toml:"enable-location-replacement" json:"enable-location-replacement,string"` + // EnableDebugMetrics is the option to enable debug metrics. + EnableDebugMetrics bool `toml:"enable-debug-metrics" json:"enable-debug-metrics,string"` + // EnableJointConsensus is the option to enable using joint consensus as a operator step. + EnableJointConsensus bool `toml:"enable-joint-consensus" json:"enable-joint-consensus,string"` + // EnableTiKVSplitRegion is the option to enable tikv split region. + // on ebs-based BR we need to disable it with TTL + EnableTiKVSplitRegion bool `toml:"enable-tikv-split-region" json:"enable-tikv-split-region,string"` + + // Schedulers support for loading customized schedulers + Schedulers SchedulerConfigs `toml:"schedulers" json:"schedulers-v2"` // json v2 is for the sake of compatible upgrade + + // Controls the time interval between write hot regions info into leveldb. + HotRegionsWriteInterval typeutil.Duration `toml:"hot-regions-write-interval" json:"hot-regions-write-interval"` + + // The day of hot regions data to be reserved. 0 means close. + HotRegionsReservedDays uint64 `toml:"hot-regions-reserved-days" json:"hot-regions-reserved-days"` + + // MaxMovableHotPeerSize is the threshold of region size for balance hot region and split bucket scheduler. + // Hot region must be split before moved if it's region size is greater than MaxMovableHotPeerSize. + MaxMovableHotPeerSize int64 `toml:"max-movable-hot-peer-size" json:"max-movable-hot-peer-size,omitempty"` + + // EnableDiagnostic is the the option to enable using diagnostic + EnableDiagnostic bool `toml:"enable-diagnostic" json:"enable-diagnostic,string"` + + // EnableWitness is the option to enable using witness + EnableWitness bool `toml:"enable-witness" json:"enable-witness,string"` + + // SlowStoreEvictingAffectedStoreRatioThreshold is the affected ratio threshold when judging a store is slow + // A store's slowness must affected more than `store-count * SlowStoreEvictingAffectedStoreRatioThreshold` to trigger evicting. + SlowStoreEvictingAffectedStoreRatioThreshold float64 `toml:"slow-store-evicting-affected-store-ratio-threshold" json:"slow-store-evicting-affected-store-ratio-threshold,omitempty"` + + // StoreLimitVersion is the version of store limit. + // v1: which is based on the region count by rate limit. + // v2: which is based on region size by window size. + StoreLimitVersion string `toml:"store-limit-version" json:"store-limit-version,omitempty"` + + // HaltScheduling is the option to halt the scheduling. Once it's on, PD will halt the scheduling, + // and any other scheduling configs will be ignored. + HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"` +} + +// StoreLimitConfig is a config about scheduling rate limit of different types for a store. +type StoreLimitConfig struct { + AddPeer float64 `toml:"add-peer" json:"add-peer"` + RemovePeer float64 `toml:"remove-peer" json:"remove-peer"` +} + +func adjustSchedulers(v *SchedulerConfigs, defValue SchedulerConfigs) { + if len(*v) == 0 { + // Make a copy to avoid changing DefaultSchedulers unexpectedly. + // When reloading from storage, the config is passed to json.Unmarshal. + // Without clone, the DefaultSchedulers could be overwritten. + *v = append(defValue[:0:0], defValue...) + } +} + +// NewConfig creates a new config. +func NewConfig() *Config { + return &Config{} +} + +// Parse parses flag definitions from the argument list. +func (c *Config) Parse(flagSet *pflag.FlagSet) error { + // Load config file if specified. + var ( + meta *toml.MetaData + err error + ) + if configFile, _ := flagSet.GetString("config"); configFile != "" { + meta, err = configutil.ConfigFromFile(c, configFile) + if err != nil { + return err + } + } + + // Ignore the error check here + configutil.AdjustCommandlineString(flagSet, &c.Log.Level, "log-level") + configutil.AdjustCommandlineString(flagSet, &c.Log.File.Filename, "log-file") + configutil.AdjustCommandlineString(flagSet, &c.Metric.PushAddress, "metrics-addr") + configutil.AdjustCommandlineString(flagSet, &c.Security.CAPath, "cacert") + configutil.AdjustCommandlineString(flagSet, &c.Security.CertPath, "cert") + configutil.AdjustCommandlineString(flagSet, &c.Security.KeyPath, "key") + configutil.AdjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints") + configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr") + configutil.AdjustCommandlineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr") + + return c.Adjust(meta, false) +} + +// Adjust is used to adjust the resource manager configurations. +func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { + configMetaData := configutil.NewConfigMetadata(meta) + warningMsgs := make([]string, 0) + if err := configMetaData.CheckUndecoded(); err != nil { + warningMsgs = append(warningMsgs, err.Error()) + } + configutil.PrintConfigCheckMsg(os.Stdout, warningMsgs) + + if c.Name == "" { + hostname, err := os.Hostname() + if err != nil { + return err + } + configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname)) + } + configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name)) + configutil.AdjustPath(&c.DataDir) + + if err := c.Validate(); err != nil { + return err + } + + configutil.AdjustString(&c.BackendEndpoints, defaultBackendEndpoints) + configutil.AdjustString(&c.ListenAddr, defaultListenAddr) + configutil.AdjustString(&c.AdvertiseListenAddr, c.ListenAddr) + + if !configMetaData.IsDefined("enable-grpc-gateway") { + c.EnableGRPCGateway = utils.DefaultEnableGRPCGateway + } + + c.adjustLog(configMetaData.Child("log")) + c.Security.Encryption.Adjust() + + if len(c.Log.Format) == 0 { + c.Log.Format = utils.DefaultLogFormat + } + + configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease) + + // adjust scheduling config + if !meta.IsDefined("max-snapshot-count") { + configutil.AdjustUint64(&c.MaxSnapshotCount, defaultMaxSnapshotCount) + } + if !meta.IsDefined("max-pending-peer-count") { + configutil.AdjustUint64(&c.MaxPendingPeerCount, defaultMaxPendingPeerCount) + } + if !meta.IsDefined("max-merge-region-size") { + configutil.AdjustUint64(&c.MaxMergeRegionSize, defaultMaxMergeRegionSize) + } + configutil.AdjustDuration(&c.SplitMergeInterval, defaultSplitMergeInterval) + configutil.AdjustDuration(&c.SwitchWitnessInterval, defaultSwitchWitnessInterval) + configutil.AdjustDuration(&c.PatrolRegionInterval, defaultPatrolRegionInterval) + configutil.AdjustDuration(&c.MaxStoreDownTime, defaultMaxStoreDownTime) + configutil.AdjustDuration(&c.HotRegionsWriteInterval, defaultHotRegionsWriteInterval) + configutil.AdjustDuration(&c.MaxStorePreparingTime, defaultMaxStorePreparingTime) + if !meta.IsDefined("leader-schedule-limit") { + configutil.AdjustUint64(&c.LeaderScheduleLimit, defaultLeaderScheduleLimit) + } + if !meta.IsDefined("region-schedule-limit") { + configutil.AdjustUint64(&c.RegionScheduleLimit, defaultRegionScheduleLimit) + } + if !meta.IsDefined("witness-schedule-limit") { + configutil.AdjustUint64(&c.WitnessScheduleLimit, defaultWitnessScheduleLimit) + } + if !meta.IsDefined("replica-schedule-limit") { + configutil.AdjustUint64(&c.ReplicaScheduleLimit, defaultReplicaScheduleLimit) + } + if !meta.IsDefined("merge-schedule-limit") { + configutil.AdjustUint64(&c.MergeScheduleLimit, defaultMergeScheduleLimit) + } + if !meta.IsDefined("hot-region-schedule-limit") { + configutil.AdjustUint64(&c.HotRegionScheduleLimit, defaultHotRegionScheduleLimit) + } + if !meta.IsDefined("hot-region-cache-hits-threshold") { + configutil.AdjustUint64(&c.HotRegionCacheHitsThreshold, defaultHotRegionCacheHitsThreshold) + } + if !meta.IsDefined("tolerant-size-ratio") { + configutil.AdjustFloat64(&c.TolerantSizeRatio, defaultTolerantSizeRatio) + } + if !meta.IsDefined("scheduler-max-waiting-operator") { + configutil.AdjustUint64(&c.SchedulerMaxWaitingOperator, defaultSchedulerMaxWaitingOperator) + } + if !meta.IsDefined("leader-schedule-policy") { + configutil.AdjustString(&c.LeaderSchedulePolicy, defaultLeaderSchedulePolicy) + } + + if !meta.IsDefined("store-limit-version") { + configutil.AdjustString(&c.StoreLimitVersion, defaultStoreLimitVersion) + } + + if !meta.IsDefined("enable-joint-consensus") { + c.EnableJointConsensus = defaultEnableJointConsensus + } + if !meta.IsDefined("enable-tikv-split-region") { + c.EnableTiKVSplitRegion = defaultEnableTiKVSplitRegion + } + if !meta.IsDefined("enable-cross-table-merge") { + c.EnableCrossTableMerge = defaultEnableCrossTableMerge + } + configutil.AdjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio) + configutil.AdjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio) + if !meta.IsDefined("enable-diagnostic") { + c.EnableDiagnostic = defaultEnableDiagnostic + } + + if !meta.IsDefined("enable-witness") { + c.EnableWitness = defaultEnableWitness + } + + // new cluster:v2, old cluster:v1 + if !meta.IsDefined("region-score-formula-version") && !reloading { + configutil.AdjustString(&c.RegionScoreFormulaVersion, defaultRegionScoreFormulaVersion) + } + + if !meta.IsDefined("halt-scheduling") { + c.HaltScheduling = defaultHaltScheduling + } + + adjustSchedulers(&c.Schedulers, DefaultSchedulers) + + if c.StoreLimit == nil { + c.StoreLimit = make(map[uint64]StoreLimitConfig) + } + + if !meta.IsDefined("hot-regions-reserved-days") { + configutil.AdjustUint64(&c.HotRegionsReservedDays, defaultHotRegionsReservedDays) + } + + if !meta.IsDefined("slow-store-evicting-affected-store-ratio-threshold") { + configutil.AdjustFloat64(&c.SlowStoreEvictingAffectedStoreRatioThreshold, defaultSlowStoreEvictingAffectedStoreRatioThreshold) + } + + return c.validateScheduleConfig() +} + +func (c *Config) adjustLog(meta *configutil.ConfigMetaData) { + if !meta.IsDefined("disable-error-verbose") { + c.Log.DisableErrorVerbose = utils.DefaultDisableErrorVerbose + } +} + +// GetTLSConfig returns the TLS config. +func (c *Config) GetTLSConfig() *grpcutil.TLSConfig { + return &c.Security.TLSConfig +} + +// Validate is used to validate if some configurations are right. +func (c *Config) Validate() error { + dataDir, err := filepath.Abs(c.DataDir) + if err != nil { + return errors.WithStack(err) + } + logFile, err := filepath.Abs(c.Log.File.Filename) + if err != nil { + return errors.WithStack(err) + } + rel, err := filepath.Rel(dataDir, filepath.Dir(logFile)) + if err != nil { + return errors.WithStack(err) + } + if !strings.HasPrefix(rel, "..") { + return errors.New("log directory shouldn't be the subdirectory of data directory") + } + + return nil +} + +// validateScheduleConfig is used to validate if some scheduling configurations are right. +func (c *Config) validateScheduleConfig() error { + if c.TolerantSizeRatio < 0 { + return errors.New("tolerant-size-ratio should be non-negative") + } + if c.LowSpaceRatio < 0 || c.LowSpaceRatio > 1 { + return errors.New("low-space-ratio should between 0 and 1") + } + if c.HighSpaceRatio < 0 || c.HighSpaceRatio > 1 { + return errors.New("high-space-ratio should between 0 and 1") + } + if c.LowSpaceRatio <= c.HighSpaceRatio { + return errors.New("low-space-ratio should be larger than high-space-ratio") + } + if c.LeaderSchedulePolicy != "count" && c.LeaderSchedulePolicy != "size" { + return errors.Errorf("leader-schedule-policy %v is invalid", c.LeaderSchedulePolicy) + } + if c.SlowStoreEvictingAffectedStoreRatioThreshold == 0 { + return errors.Errorf("slow-store-evicting-affected-store-ratio-threshold is not set") + } + return nil +} diff --git a/pkg/mcs/scheduling/server/config_test.go b/pkg/mcs/scheduling/server/config_test.go new file mode 100644 index 000000000000..77ccc91eb9f3 --- /dev/null +++ b/pkg/mcs/scheduling/server/config_test.go @@ -0,0 +1,42 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "testing" + + "github.com/BurntSushi/toml" + "github.com/stretchr/testify/require" +) + +func TestAdjust(t *testing.T) { + re := require.New(t) + cfgData := ` +max-merge-region-size = 0 +enable-one-way-merge = true +leader-schedule-limit = 0 +` + + cfg := NewConfig() + meta, err := toml.Decode(cfgData, &cfg) + re.NoError(err) + err = cfg.Adjust(&meta, false) + re.NoError(err) + + re.Equal(uint64(0), cfg.MaxMergeRegionSize) + re.True(cfg.EnableOneWayMerge) + re.Equal(uint64(0), cfg.LeaderScheduleLimit) + re.Equal(uint64(0), cfg.MaxMergeRegionKeys) +} diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go new file mode 100644 index 000000000000..3b0e51f1f66f --- /dev/null +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -0,0 +1,71 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net/http" + + "github.com/pingcap/log" + bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/mcs/registry" + "github.com/tikv/pd/pkg/utils/apiutil" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// gRPC errors +var ( + ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched") +) + +// SetUpRestHandler is a hook to sets up the REST service. +var SetUpRestHandler = func(srv *Service) (http.Handler, apiutil.APIServiceGroup) { + return dummyRestService{}, apiutil.APIServiceGroup{} +} + +type dummyRestService struct{} + +func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) + w.Write([]byte("not implemented")) +} + +// Service is the scheduling grpc service. +type Service struct { + *Server +} + +// NewService creates a new TSO service. +func NewService(svr bs.Server) registry.RegistrableService { + server, ok := svr.(*Server) + if !ok { + log.Fatal("create scheduling server failed") + } + return &Service{ + Server: server, + } +} + +// RegisterGRPCService registers the service to gRPC server. +func (s *Service) RegisterGRPCService(g *grpc.Server) { +} + +// RegisterRESTHandler registers the service to REST server. +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { + handler, group := SetUpRestHandler(s) + apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) +} diff --git a/pkg/mcs/scheduling/server/metrics.go b/pkg/mcs/scheduling/server/metrics.go new file mode 100644 index 000000000000..b3f5b7b41de7 --- /dev/null +++ b/pkg/mcs/scheduling/server/metrics.go @@ -0,0 +1,37 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import "github.com/prometheus/client_golang/prometheus" + +const ( + namespace = "scheduling" + serverSubsystem = "server" +) + +var ( + // Meta & Server info. + serverInfo = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: serverSubsystem, + Name: "info", + Help: "Indicate the scheduling server info, and the value is the start timestamp (s).", + }, []string{"version", "hash"}) +) + +func init() { + prometheus.MustRegister(serverInfo) +} diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go new file mode 100644 index 000000000000..982e0734966c --- /dev/null +++ b/pkg/mcs/scheduling/server/server.go @@ -0,0 +1,552 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "net/url" + "os" + "os/signal" + "path" + "strconv" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/kvproto/pkg/diagnosticspb" + "github.com/pingcap/log" + "github.com/soheilhy/cmux" + "github.com/spf13/cobra" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/discovery" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/memberutil" + "github.com/tikv/pd/pkg/utils/metricutil" + "github.com/tikv/pd/pkg/versioninfo" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/types" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// Server is the scheduling server, and it implements bs.Server. +type Server struct { + diagnosticspb.DiagnosticsServer + + // Server state. 0 is not running, 1 is running. + isRunning int64 + + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel func() + serverLoopWg sync.WaitGroup + + cfg *Config + clusterID uint64 + name string + listenURL *url.URL + + // for the primary election of scheduling + participant *member.Participant + etcdClient *clientv3.Client + httpClient *http.Client + + secure bool + muxListener net.Listener + httpListener net.Listener + grpcServer *grpc.Server + httpServer *http.Server + service *Service + + // Callback functions for different stages + // startCallbacks will be called after the server is started. + startCallbacks []func() + // primaryCallbacks will be called after the server becomes leader. + primaryCallbacks []func(context.Context) + + serviceRegister *discovery.ServiceRegister +} + +// Name returns the unique etcd name for this server in etcd cluster. +func (s *Server) Name() string { + return s.name +} + +// Context returns the context. +func (s *Server) Context() context.Context { + return s.ctx +} + +// GetAddr returns the server address. +func (s *Server) GetAddr() string { + return s.cfg.ListenAddr +} + +// Run runs the scheduling server. +func (s *Server) Run() (err error) { + if err = s.initClient(); err != nil { + return err + } + if err = s.startServer(); err != nil { + return err + } + + s.startServerLoop() + + return nil +} + +func (s *Server) startServerLoop() { + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + s.serverLoopWg.Add(1) + go s.primaryElectionLoop() +} + +func (s *Server) primaryElectionLoop() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + for { + if s.IsClosed() { + log.Info("server is closed, exit scheduling primary election loop") + return + } + + primary, checkAgain := s.participant.CheckLeader() + if checkAgain { + continue + } + if primary != nil { + log.Info("start to watch the primary", zap.Stringer("scheduling-primary", primary)) + // Watch will keep looping and never return unless the primary/leader has changed. + primary.Watch(s.serverLoopCtx) + log.Info("the scheduling primary has changed, try to re-campaign a primary") + } + + s.campaignLeader() + } +} + +func (s *Server) campaignLeader() { + log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name())) + if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err.Error() == errs.ErrEtcdTxnConflict.Error() { + log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully", + zap.String("campaign-scheduling-primary-name", s.participant.Name())) + } else { + log.Error("campaign scheduling primary meets error due to etcd error", + zap.String("campaign-scheduling-primary-name", s.participant.Name()), + errs.ZapError(err)) + } + return + } + + // Start keepalive the leadership and enable scheduling service. + ctx, cancel := context.WithCancel(s.serverLoopCtx) + var resetLeaderOnce sync.Once + defer resetLeaderOnce.Do(func() { + cancel() + s.participant.ResetLeader() + }) + + // maintain the leadership, after this, scheduling could be ready to provide service. + s.participant.KeepLeader(ctx) + log.Info("campaign scheduling primary ok", zap.String("campaign-scheduling-primary-name", s.participant.Name())) + + log.Info("triggering the primary callback functions") + for _, cb := range s.primaryCallbacks { + cb(ctx) + } + + s.participant.EnableLeader() + log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name())) + + leaderTicker := time.NewTicker(mcsutils.LeaderTickInterval) + defer leaderTicker.Stop() + + for { + select { + case <-leaderTicker.C: + if !s.participant.IsLeader() { + log.Info("no longer a primary/leader because lease has expired, the scheduling primary/leader will step down") + return + } + case <-ctx.Done(): + // Server is closed and it should return nil. + log.Info("server is closed") + return + } + } +} + +// Close closes the server. +func (s *Server) Close() { + if !atomic.CompareAndSwapInt64(&s.isRunning, 1, 0) { + // server is already closed + return + } + + log.Info("closing scheduling server ...") + s.serviceRegister.Deregister() + s.stopHTTPServer() + s.stopGRPCServer() + s.muxListener.Close() + s.serverLoopCancel() + s.serverLoopWg.Wait() + + if s.etcdClient != nil { + if err := s.etcdClient.Close(); err != nil { + log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) + } + } + + if s.httpClient != nil { + s.httpClient.CloseIdleConnections() + } + + log.Info("scheduling server is closed") +} + +// GetClient returns builtin etcd client. +func (s *Server) GetClient() *clientv3.Client { + return s.etcdClient +} + +// GetHTTPClient returns builtin http client. +func (s *Server) GetHTTPClient() *http.Client { + return s.httpClient +} + +// GetLeaderListenUrls gets service endpoints from the leader in election group. +func (s *Server) GetLeaderListenUrls() []string { + return s.participant.GetLeaderListenUrls() +} + +// AddStartCallback adds a callback in the startServer phase. +func (s *Server) AddStartCallback(callbacks ...func()) { + s.startCallbacks = append(s.startCallbacks, callbacks...) +} + +// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) IsServing() bool { + return !s.IsClosed() && s.participant.IsLeader() +} + +// IsClosed checks if the server loop is closed +func (s *Server) IsClosed() bool { + return s != nil && atomic.LoadInt64(&s.isRunning) == 0 +} + +// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { + s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) +} + +func (s *Server) initClient() error { + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + return err + } + u, err := types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ",")) + if err != nil { + return err + } + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0]) + return err +} + +func (s *Server) startGRPCServer(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + log.Info("grpc server starts serving", zap.String("address", l.Addr().String())) + err := s.grpcServer.Serve(l) + if s.IsClosed() { + log.Info("grpc server stopped") + } else { + log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err)) + } +} + +func (s *Server) startHTTPServer(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + log.Info("http server starts serving", zap.String("address", l.Addr().String())) + err := s.httpServer.Serve(l) + if s.IsClosed() { + log.Info("http server stopped") + } else { + log.Fatal("http server stopped unexpectedly", errs.ZapError(err)) + } +} + +func (s *Server) startGRPCAndHTTPServers(serverReadyChan chan<- struct{}, l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + mux := cmux.New(l) + // Don't hang on matcher after closing listener + mux.SetReadTimeout(3 * time.Second) + grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) + if s.secure { + s.httpListener = mux.Match(cmux.Any()) + } else { + s.httpListener = mux.Match(cmux.HTTP1()) + } + s.grpcServer = grpc.NewServer() + s.service.RegisterGRPCService(s.grpcServer) + diagnosticspb.RegisterDiagnosticsServer(s.grpcServer, s) + s.serverLoopWg.Add(1) + go s.startGRPCServer(grpcL) + + handler, _ := SetUpRestHandler(s.service) + s.httpServer = &http.Server{ + Handler: handler, + ReadTimeout: 3 * time.Second, + } + s.serverLoopWg.Add(1) + go s.startHTTPServer(s.httpListener) + + serverReadyChan <- struct{}{} + if err := mux.Serve(); err != nil { + if s.IsClosed() { + log.Info("mux stopped serving", errs.ZapError(err)) + } else { + log.Fatal("mux stopped serving unexpectedly", errs.ZapError(err)) + } + } +} + +func (s *Server) stopHTTPServer() { + log.Info("stopping http server") + defer log.Info("http server stopped") + + ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultHTTPGracefulShutdownTimeout) + defer cancel() + + // First, try to gracefully shutdown the http server + ch := make(chan struct{}) + go func() { + defer close(ch) + s.httpServer.Shutdown(ctx) + }() + + select { + case <-ch: + case <-ctx.Done(): + // Took too long, manually close open transports + log.Warn("http server graceful shutdown timeout, forcing close") + s.httpServer.Close() + // concurrent Graceful Shutdown should be interrupted + <-ch + } +} + +func (s *Server) stopGRPCServer() { + log.Info("stopping grpc server") + defer log.Info("grpc server stopped") + + // Do not grpc.Server.GracefulStop with TLS enabled etcd server + // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 + // and https://github.com/etcd-io/etcd/issues/8916 + if s.secure { + s.grpcServer.Stop() + return + } + + ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultGRPCGracefulStopTimeout) + defer cancel() + + // First, try to gracefully shutdown the grpc server + ch := make(chan struct{}) + go func() { + defer close(ch) + // Close listeners to stop accepting new connections, + // will block on any existing transports + s.grpcServer.GracefulStop() + }() + + // Wait until all pending RPCs are finished + select { + case <-ch: + case <-ctx.Done(): + // Took too long, manually close open transports + // e.g. watch streams + log.Warn("grpc server graceful shutdown timeout, forcing close") + s.grpcServer.Stop() + // concurrent GracefulStop should be interrupted + <-ch + } +} + +func (s *Server) startServer() (err error) { + if s.clusterID, err = mcsutils.InitClusterID(s.ctx, s.etcdClient); err != nil { + return err + } + log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID)) + // The independent scheduling service still reuses PD version info since PD and scheduling are just + // different service modes provided by the same pd-server binary + serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) + + uniqueName := s.cfg.ListenAddr + uniqueID := memberutil.GenerateUniqueID(uniqueName) + log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) + schedulingPrimaryPrefix := fmt.Sprintf("/ms/%d/scheduling", s.clusterID) + s.participant = member.NewParticipant(s.etcdClient) + s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)), + "primary", "keyspace group primary election", s.cfg.AdvertiseListenAddr) + + s.service = &Service{Server: s} + + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + return err + } + s.listenURL, err = url.Parse(s.cfg.ListenAddr) + if err != nil { + return err + } + if tlsConfig != nil { + s.secure = true + s.muxListener, err = tls.Listen(mcsutils.TCPNetworkStr, s.listenURL.Host, tlsConfig) + } else { + s.muxListener, err = net.Listen(mcsutils.TCPNetworkStr, s.listenURL.Host) + } + if err != nil { + return err + } + + serverReadyChan := make(chan struct{}) + defer close(serverReadyChan) + s.serverLoopWg.Add(1) + go s.startGRPCAndHTTPServers(serverReadyChan, s.muxListener) + <-serverReadyChan + + // Run callbacks + log.Info("triggering the start callback functions") + for _, cb := range s.startCallbacks { + cb() + } + + // Server has started. + entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} + serializedEntry, err := entry.Serialize() + if err != nil { + return err + } + s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10), + mcsutils.SchedulingServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds) + if err := s.serviceRegister.Register(); err != nil { + log.Error("failed to register the service", zap.String("service-name", mcsutils.SchedulingServiceName), errs.ZapError(err)) + return err + } + atomic.StoreInt64(&s.isRunning, 1) + return nil +} + +// NewServer creates a new scheduling server. +func NewServer(ctx context.Context, cfg *Config) *Server { + return &Server{ + name: cfg.Name, + ctx: ctx, + cfg: cfg, + } +} + +// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server +func CreateServerWrapper(cmd *cobra.Command, args []string) { + cmd.Flags().Parse(args) + cfg := NewConfig() + flagSet := cmd.Flags() + err := cfg.Parse(flagSet) + defer logutil.LogPanic() + + if err != nil { + cmd.Println(err) + return + } + + if printVersion, err := flagSet.GetBool("version"); err != nil { + cmd.Println(err) + return + } else if printVersion { + versioninfo.Print() + exit(0) + } + + // New zap logger + err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) + if err == nil { + log.ReplaceGlobals(cfg.Logger, cfg.LogProps) + } else { + log.Fatal("initialize logger error", errs.ZapError(err)) + } + // Flushing any buffered log entries + defer log.Sync() + + versioninfo.Log("Scheduling") + log.Info("scheduling config", zap.Reflect("config", cfg)) + + grpcprometheus.EnableHandlingTimeHistogram() + + metricutil.Push(&cfg.Metric) + + ctx, cancel := context.WithCancel(context.Background()) + svr := NewServer(ctx, cfg) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Run(); err != nil { + log.Fatal("run server failed", errs.ZapError(err)) + } + + <-ctx.Done() + log.Info("got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func exit(code int) { + log.Sync() + os.Exit(code) +} diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index b73ddd7ed8fa..af1b5a207e9a 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -57,6 +57,8 @@ const ( TSOServiceName = "tso" // ResourceManagerServiceName is the name of resource manager server. ResourceManagerServiceName = "resource_manager" + // SchedulingServiceName is the name of scheduling server. + SchedulingServiceName = "scheduling" // KeyspaceGroupsKey is the path component of keyspace groups. KeyspaceGroupsKey = "keyspace_groups" diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index a1fa70875c9f..150536409763 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -84,6 +84,7 @@ type Config interface { GetClusterVersion() *semver.Version GetStoreLimitVersion() string IsDiagnosticAllowed() bool + IsSchedulingHalted() bool // for test purpose SetPlacementRuleEnabled(bool) SetSplitMergeInterval(time.Duration) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 67e4e3c70cf5..eedddeccaca2 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -591,6 +591,7 @@ func (c *RaftCluster) runUpdateStoreStats() { } } +// runCoordinator runs the main scheduling loop. func (c *RaftCluster) runCoordinator() { defer logutil.LogPanic() defer c.wg.Done()