Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: remove unnecessary interfaces #6551

Merged
merged 2 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, key

// RegionSetInformer provides access to a shared informer of regions.
type RegionSetInformer interface {
GetRegionCount() int
GetTotalRegionCount() int
RandFollowerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
RandLeaderRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
RandLearnerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo
Expand Down
16 changes: 8 additions & 8 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,8 +1278,8 @@ func (r *RegionsInfo) GetStoreStats(storeID uint64) (leader, region, witness, le
r.learners[storeID].length(), r.pendingPeers[storeID].length(), r.leaders[storeID].TotalSize(), r.getStoreRegionSizeLocked(storeID)
}

// GetRegionCount gets the total count of RegionInfo of regionMap
func (r *RegionsInfo) GetRegionCount() int {
// GetTotalRegionCount gets the total count of RegionInfo of regionMap
func (r *RegionsInfo) GetTotalRegionCount() int {
r.t.RLock()
defer r.t.RUnlock()
return len(r.regions)
Expand Down Expand Up @@ -1473,8 +1473,8 @@ func (r *RegionInfo) GetWriteLoads() []float64 {
}
}

// GetRangeCount returns the number of regions that overlap with the range [startKey, endKey).
func (r *RegionsInfo) GetRangeCount(startKey, endKey []byte) int {
// GetRegionCount returns the number of regions that overlap with the range [startKey, endKey).
func (r *RegionsInfo) GetRegionCount(startKey, endKey []byte) int {
r.t.RLock()
defer r.t.RUnlock()
start := &regionItem{&RegionInfo{meta: &metapb.Region{StartKey: startKey}}}
Expand All @@ -1496,9 +1496,9 @@ func (r *RegionsInfo) GetRangeCount(startKey, endKey []byte) int {
return endIndex - startIndex + 1
}

// ScanRange scans regions intersecting [start key, end key), returns at most
// ScanRegions scans regions intersecting [start key, end key), returns at most
// `limit` regions. limit <= 0 means no limit.
func (r *RegionsInfo) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo {
func (r *RegionsInfo) ScanRegions(startKey, endKey []byte, limit int) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
var res []*RegionInfo
Expand All @@ -1515,9 +1515,9 @@ func (r *RegionsInfo) ScanRange(startKey, endKey []byte, limit int) []*RegionInf
return res
}

// ScanRangeWithIterator scans from the first region containing or behind start key,
// ScanRegionWithIterator scans from the first region containing or behind start key,
// until iterator returns false.
func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(region *RegionInfo) bool) {
func (r *RegionsInfo) ScanRegionWithIterator(startKey []byte, iterator func(region *RegionInfo) bool) {
r.t.RLock()
defer r.t.RUnlock()
r.tree.scanRange(startKey, iterator)
Expand Down
2 changes: 1 addition & 1 deletion pkg/dashboard/keyvisual/input/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func clusterScan(rc *core.BasicCluster) RegionsInfo {
regions := make([]*core.RegionInfo, 0, limit)

for {
rs := rc.ScanRange(startKey, endKey, limit)
rs := rc.ScanRegions(startKey, endKey, limit)
length := len(rs)
if length == 0 {
break
Expand Down
19 changes: 0 additions & 19 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,9 @@ func (mc *Cluster) GetPersistOptions() *config.PersistOptions {
// UpdateRegionsLabelLevelStats updates the label level stats for the regions.
func (mc *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {}

// IsSchedulerExisted checks if the scheduler with name is existed or not.
func (mc *Cluster) IsSchedulerExisted(name string) (bool, error) { return false, nil }

// IsSchedulerDisabled checks if the scheduler with name is disabled or not.
func (mc *Cluster) IsSchedulerDisabled(name string) (bool, error) { return false, nil }

// CheckSchedulingAllowance checks if the cluster allows scheduling currently.
func (mc *Cluster) CheckSchedulingAllowance() (bool, error) { return true, nil }

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.ScanRange(startKey, endKey, limit)
}

// LoadRegion puts region info without leader
func (mc *Cluster) LoadRegion(regionID uint64, peerStoreIDs ...uint64) {
// regions load from etcd will have no leader
Expand Down Expand Up @@ -817,11 +806,6 @@ func (mc *Cluster) PutStoreWithLabels(id uint64, labelPairs ...string) {
mc.AddLabelsStore(id, 0, labels)
}

// RemoveScheduler mocks method.
func (mc *Cluster) RemoveScheduler(name string) error {
return nil
}

// MockRegionInfo returns a mock region
// If leaderStoreID is zero, the regions would have no leader
func (mc *Cluster) MockRegionInfo(regionID uint64, leaderStoreID uint64,
Expand Down Expand Up @@ -950,6 +934,3 @@ func (mc *Cluster) ObserveRegionsStats() {
storeIDs, writeBytesRates, writeKeysRates := mc.BasicCluster.GetStoresWriteRate()
mc.HotStat.ObserveRegionsStats(storeIDs, writeBytesRates, writeKeysRates)
}

// RecordOpStepWithTTL records OpStep with TTL
func (mc *Cluster) RecordOpStepWithTTL(regionID uint64) {}
2 changes: 1 addition & 1 deletion pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func (m *ModeManager) updateProgress() {
key = r.GetEndKey()
}
m.drSampleTotalRegion = len(sampleRegions)
m.drTotalRegion = m.cluster.GetRegionCount()
m.drTotalRegion = m.cluster.GetTotalRegionCount()
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() {

mc := NewMergeChecker(suite.ctx, tc, tc.GetOpts())
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */)
oc := operator.NewController(suite.ctx, tc, stream)
oc := operator.NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), stream)

regions[2] = regions[2].Clone(
core.SetPeers([]*metapb.Peer{
Expand Down
46 changes: 26 additions & 20 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,23 @@ type Coordinator struct {
// NewCoordinator creates a new Coordinator.
func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(ctx)
opController := operator.NewController(ctx, cluster, hbStreams)
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetPersistOptions(), hbStreams)
schedulers := make(map[string]*scheduleController)
return &Coordinator{
ctx: ctx,
cancel: cancel,
cluster: cluster,
prepareChecker: newPrepareChecker(),
checkers: checker.NewController(ctx, cluster, cluster.GetOpts(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController),
regionScatterer: NewRegionScatterer(ctx, cluster, opController),
regionSplitter: NewRegionSplitter(cluster, NewSplitRegionsHandler(cluster, opController)),
schedulers: schedulers,
opController: opController,
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
diagnosticManager: newDiagnosticManager(cluster),
}
c := &Coordinator{
ctx: ctx,
cancel: cancel,
cluster: cluster,
prepareChecker: newPrepareChecker(),
checkers: checker.NewController(ctx, cluster, cluster.GetOpts(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController),
regionScatterer: NewRegionScatterer(ctx, cluster, opController),
regionSplitter: NewRegionSplitter(cluster, NewSplitRegionsHandler(cluster, opController)),
schedulers: schedulers,
opController: opController,
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
}
c.diagnosticManager = newDiagnosticManager(c, cluster.GetPersistOptions())
return c
}

// GetWaitingRegions returns the regions in the waiting list.
Expand Down Expand Up @@ -304,7 +305,7 @@ func (c *Coordinator) drivePushOperator() {
log.Info("drive push operator has been stopped")
return
case <-ticker.C:
c.opController.PushOperators()
c.opController.PushOperators(c.RecordOpStepWithTTL)
}
}
}
Expand Down Expand Up @@ -381,7 +382,7 @@ func (c *Coordinator) Run() {
log.Info("skip create scheduler with independent configuration", zap.String("scheduler-name", name), zap.String("scheduler-type", cfg.Type), zap.Strings("scheduler-args", cfg.Args))
continue
}
s, err := schedulers.CreateScheduler(cfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)))
s, err := schedulers.CreateScheduler(cfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
continue
Expand All @@ -402,7 +403,7 @@ func (c *Coordinator) Run() {
continue
}

s, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args))
s, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args), c.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
continue
Expand Down Expand Up @@ -451,7 +452,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) {
}
schedulerArgs := SchedulerArgs.(func() []string)
// create and add user scheduler
s, err := schedulers.CreateScheduler(schedulerType(), c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerType(), schedulerArgs()))
s, err := schedulers.CreateScheduler(schedulerType(), c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerType(), schedulerArgs()), c.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler", zap.String("scheduler-type", schedulerType()), errs.ZapError(err))
return
Expand Down Expand Up @@ -706,7 +707,7 @@ func (c *Coordinator) removeOptScheduler(o *config.PersistOptions, name string)
for i, schedulerCfg := range v.Schedulers {
// To create a temporary scheduler is just used to get scheduler's name
decoder := schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args)
tmp, err := schedulers.CreateScheduler(schedulerCfg.Type, operator.NewController(c.ctx, nil, nil), storage.NewStorageWithMemoryBackend(), decoder)
tmp, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, storage.NewStorageWithMemoryBackend(), decoder, c.RemoveScheduler)
if err != nil {
return err
}
Expand Down Expand Up @@ -927,6 +928,11 @@ func (c *Coordinator) GetDiagnosticResult(name string) (*DiagnosticResult, error
return c.diagnosticManager.getDiagnosticResult(name)
}

// RecordOpStepWithTTL records OpStep with TTL
func (c *Coordinator) RecordOpStepWithTTL(regionID uint64) {
c.GetRuleChecker().RecordRegionPromoteToNonWitness(regionID)
}

// scheduleController is used to manage a scheduler to schedulers.
type scheduleController struct {
schedulers.Scheduler
Expand Down
17 changes: 9 additions & 8 deletions pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,23 @@ type ClusterInformer interface {
GetAllocator() id.Allocator
GetRegionLabeler() *labeler.RegionLabeler
GetStorage() storage.Storage
RemoveScheduler(name string) error
AddSuspectRegions(ids ...uint64)
RecordOpStepWithTTL(regionID uint64)
UpdateRegionsLabelLevelStats(regions []*core.RegionInfo)
IsSchedulerExisted(name string) (bool, error)
IsSchedulerDisabled(name string) (bool, error)
CheckSchedulingAllowance() (bool, error)
AddSuspectRegions(ids ...uint64)
GetPersistOptions() *config.PersistOptions
}

// RegionHealthCluster is an aggregate interface that wraps multiple interfaces
type RegionHealthCluster interface {
core.StoreSetInformer
core.StoreSetController
core.RegionSetInformer
BasicCluster

GetOpts() sc.Config
GetRuleManager() *placement.RuleManager
}

// BasicCluster is an aggregate interface that wraps multiple interfaces
type BasicCluster interface {
core.StoreSetInformer
core.StoreSetController
core.RegionSetInformer
}
32 changes: 18 additions & 14 deletions pkg/schedule/diagnostic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/movingaverage"
sche "github.com/tikv/pd/pkg/schedule/core"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/schedulers"
Expand Down Expand Up @@ -56,28 +56,30 @@ var DiagnosableSummaryFunc = map[string]plan.Summary{
}

type diagnosticManager struct {
cluster sche.ClusterInformer
recorders map[string]*diagnosticRecorder
coordinator *Coordinator
config sc.Config
recorders map[string]*diagnosticRecorder
}

func newDiagnosticManager(cluster sche.ClusterInformer) *diagnosticManager {
func newDiagnosticManager(coordinator *Coordinator, config sc.Config) *diagnosticManager {
recorders := make(map[string]*diagnosticRecorder)
for name := range DiagnosableSummaryFunc {
recorders[name] = newDiagnosticRecorder(name, cluster)
recorders[name] = newDiagnosticRecorder(name, coordinator, config)
}
return &diagnosticManager{
cluster: cluster,
recorders: recorders,
coordinator: coordinator,
config: config,
recorders: recorders,
}
}

func (d *diagnosticManager) getDiagnosticResult(name string) (*DiagnosticResult, error) {
if !d.cluster.GetOpts().IsDiagnosticAllowed() {
if !d.config.IsDiagnosticAllowed() {
return nil, errs.ErrDiagnosticDisabled
}

isSchedulerExisted, _ := d.cluster.IsSchedulerExisted(name)
isDisabled, _ := d.cluster.IsSchedulerDisabled(name)
isSchedulerExisted, _ := d.coordinator.IsSchedulerExisted(name)
isDisabled, _ := d.coordinator.IsSchedulerDisabled(name)
if !isSchedulerExisted || isDisabled {
ts := uint64(time.Now().Unix())
res := &DiagnosticResult{Name: name, Timestamp: ts, Status: disabled}
Expand All @@ -102,20 +104,22 @@ func (d *diagnosticManager) getRecorder(name string) *diagnosticRecorder {
// diagnosticRecorder is used to manage diagnostic for one scheduler.
type diagnosticRecorder struct {
schedulerName string
cluster sche.ClusterInformer
coordinator *Coordinator
config sc.Config
summaryFunc plan.Summary
results *cache.FIFO
}

func newDiagnosticRecorder(name string, cluster sche.ClusterInformer) *diagnosticRecorder {
func newDiagnosticRecorder(name string, coordinator *Coordinator, config sc.Config) *diagnosticRecorder {
summaryFunc, ok := DiagnosableSummaryFunc[name]
if !ok {
log.Error("can't find summary function", zap.String("scheduler-name", name))
return nil
}
return &diagnosticRecorder{
cluster: cluster,
coordinator: coordinator,
schedulerName: name,
config: config,
summaryFunc: summaryFunc,
results: cache.NewFIFO(maxDiagnosticResultNum),
}
Expand All @@ -125,7 +129,7 @@ func (d *diagnosticRecorder) isAllowed() bool {
if d == nil {
return false
}
return d.cluster.GetOpts().IsDiagnosticAllowed()
return d.config.IsDiagnosticAllowed()
}

func (d *diagnosticRecorder) getLastResult() *DiagnosticResult {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/filter/region_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (f *regionEmptyFilter) Select(region *core.RegionInfo) *plan.Status {

// isEmptyRegionAllowBalance returns true if the region is not empty or the number of regions is too small.
func isEmptyRegionAllowBalance(cluster sche.RegionHealthCluster, region *core.RegionInfo) bool {
return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetRegionCount() < core.InitClusterRegionThreshold
return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetTotalRegionCount() < core.InitClusterRegionThreshold
}

type regionWitnessFilter struct {
Expand Down
Loading