Skip to content

Commit

Permalink
impl
Browse files Browse the repository at this point in the history
Signed-off-by: 童剑 <[email protected]>
  • Loading branch information
bufferflies committed Jan 21, 2025
1 parent 46bd44d commit 66f70c2
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 70 deletions.
4 changes: 0 additions & 4 deletions pkg/core/constant/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ func (k ResourceKind) String() string {
return "region"
case WitnessKind:
return "witness"
case LearnerKind:
return `learner`
case unKnownKind:
return "unknown"
default:
return "unknown"
}
Expand Down
27 changes: 26 additions & 1 deletion pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package operator

import (
"bytes"
"context"
"fmt"
"strconv"
Expand Down Expand Up @@ -828,14 +829,33 @@ func (oc *Controller) GetHistory(start time.Time) []OpHistory {
return history
}

// OpInfluenceOption is used to filter the region.
// returns true if the region meets the condition, it will ignore this region in the influence calculation.
// returns false if the region does not meet the condition, it will calculate the influence of this region.
type OpInfluenceOption func(region *core.RegionInfo) bool

// WithRangeOption returns an OpInfluenceOption that filters the region by the label.
func WithRangeOption(ranges []core.KeyRange) OpInfluenceOption {
return func(region *core.RegionInfo) bool {
for _, r := range ranges {
// the start key of the region must greater than the given range start key.
// the end key of the region must less than the given range end key.
if bytes.Compare(region.GetStartKey(), r.StartKey) < 0 || bytes.Compare(r.EndKey, region.GetEndKey()) < 0 {
return false
}
}
return true
}
}

// OperatorCount gets the count of operators filtered by kind.
// kind only has one OpKind.
func (oc *Controller) OperatorCount(kind OpKind) uint64 {
return oc.counts.getCountByKind(kind)
}

// GetOpInfluence gets OpInfluence.
func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster) OpInfluence {
func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster, ops ...OpInfluenceOption) OpInfluence {
influence := OpInfluence{
StoresInfluence: make(map[uint64]*StoreInfluence),
}
Expand All @@ -844,6 +864,11 @@ func (oc *Controller) GetOpInfluence(cluster *core.BasicCluster) OpInfluence {
op := value.(*Operator)
if !op.CheckTimeout() && !op.CheckSuccess() {
region := cluster.GetRegion(op.RegionID())
for _, opt := range ops {
if !opt(region) {
return true
}
}
if region != nil {
op.UnfinishedInfluence(influence, region)
}
Expand Down
211 changes: 152 additions & 59 deletions pkg/schedule/schedulers/balance_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,28 @@ import (
"go.uber.org/zap"
"net/http"
"sort"
"strconv"
"time"

"github.com/gorilla/mux"
"github.com/unrolled/render"

"github.com/pingcap/log"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"

"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/syncutil"
)

const balanceRangeName ="balance-range-scheduler"
const balanceRangeName = "balance-range-scheduler"

type balanceRangeSchedulerHandler struct {
rd *render.Render
Expand Down Expand Up @@ -144,7 +145,6 @@ func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster)
return allowed
}


// BalanceRangeCreateOption is used to create a scheduler with an option.
type BalanceRangeCreateOption func(s *balanceRangeScheduler)

Expand Down Expand Up @@ -174,21 +174,32 @@ func newBalanceRangeScheduler(opController *operator.Controller, conf *balanceRa
// Schedule schedules the balance key range operator.
func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {

Check failure on line 175 in pkg/schedule/schedulers/balance_range.go

View workflow job for this annotation

GitHub Actions / statics

unused-parameter: parameter 'dryRun' seems to be unused, consider removing or renaming it to match ^_ (revive)
balanceRangeCounter.Inc()
plan,err:=s.prepare(cluster)
opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster(), operator.WithRangeOption(s.conf.Ranges))
plan, err := s.prepare(cluster, opInfluence)
if err != nil {
log.Error("failed to prepare balance key range scheduler", errs.ZapError(err))
return nil, nil
}

downFilter := filter.NewRegionDownFilter()
replicaFilter := filter.NewRegionReplicatedFilter(cluster)
snapshotFilter := filter.NewSnapshotSendFilter(plan.stores, constant.Medium)
baseRegionFilters := []filter.RegionFilter{downFilter, replicaFilter, snapshotFilter}

for sourceIndex,sourceStore:=range plan.stores{
plan.source=sourceStore
switch s.role{
pendingFilter := filter.NewRegionPendingFilter()
baseRegionFilters := []filter.RegionFilter{downFilter, replicaFilter, snapshotFilter, pendingFilter}

for sourceIndex, sourceStore := range plan.stores {
plan.source = sourceStore
plan.sourceScore = plan.score(plan.source.GetID())
if plan.sourceScore < plan.averageScore {
break
}
switch s.role {
case Leader:
plan.region=filter.SelectOneRegion(cluster.RandLeaderRegions(plan.sourceStoreID(), s.conf.Ranges), nil,baseRegionFilters...)
plan.region = filter.SelectOneRegion(cluster.RandLeaderRegions(plan.sourceStoreID(), s.conf.Ranges), nil, baseRegionFilters...)
case Learner:
plan.region=filter.SelectOneRegion(cluster.RandLearnerRegions(plan.sourceStoreID(), s.conf.Ranges), nil,baseRegionFilters...)
plan.region = filter.SelectOneRegion(cluster.RandLearnerRegions(plan.sourceStoreID(), s.conf.Ranges), nil, baseRegionFilters...)
case Follower:
plan.region=filter.SelectOneRegion(cluster.RandFollowerRegions(plan.sourceStoreID(), s.conf.Ranges), nil,baseRegionFilters...)
plan.region = filter.SelectOneRegion(cluster.RandFollowerRegions(plan.sourceStoreID(), s.conf.Ranges), nil, baseRegionFilters...)
}
if plan.region == nil {
balanceRangeNoRegionCounter.Inc()
Expand All @@ -203,89 +214,171 @@ func (s *balanceRangeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun b
}
// Check region leader
if plan.region.GetLeader() == nil {
log.Warn("region have no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID()))
log.Warn("region have no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", plan.region.GetID()))
balanceRangeNoLeaderCounter.Inc()
continue
}
plan.fit = replicaFilter.(*filter.RegionReplicatedFilter).GetFit()
if op := s.transferPeer(plan, plan.stores[sourceIndex+1:]); op != nil {
op.Counters = append(op.Counters, balanceRegionNewOpCounter)
op.Counters = append(op.Counters, balanceRangeNewOperatorCounter)
return []*operator.Operator{op}, nil
}
}

if err != nil {

Check failure on line 228 in pkg/schedule/schedulers/balance_range.go

View workflow job for this annotation

GitHub Actions / statics

empty-lines: extra empty line at the end of a block (revive)
log.Error("failed to prepare balance key range scheduler", errs.ZapError(err))
return nil,nil
return nil, nil

}

Check failure on line 232 in pkg/schedule/schedulers/balance_range.go

View workflow job for this annotation

GitHub Actions / statics

unnecessary trailing newline (whitespace)
return nil, nil
}

// transferPeer selects the best store to create a new peer to replace the old peer.
func (s *balanceRangeScheduler) transferPeer(plan *balanceRangeSchedulerPlan, dstStores []*storeInfo) *operator.Operator {
func (s *balanceRangeScheduler) transferPeer(plan *balanceRangeSchedulerPlan, dstStores []*core.StoreInfo) *operator.Operator {
excludeTargets := plan.region.GetStoreIDs()
if s.role!=Leader{
excludeTargets = append(excludeTargets, plan.sourceStoreID())
if s.role != Leader {
excludeTargets = make(map[uint64]struct{})
}
conf := plan.GetSchedulerConfig()
filters := []filter.Filter{
filter.NewExcludedFilter(s.GetName(), nil, excludeTargets),
filter.NewPlacementSafeguard(s.GetName(), conf, plan.GetBasicCluster(), plan.GetRuleManager(), plan.region, plan.source, plan.fit),
}
candidates := filter.NewCandidates(s.R, dstStores).FilterTarget(conf, nil, s.filterCounter, filters...)
for i := range candidates.Stores {
plan.target = candidates.Stores[len(candidates.Stores)-i-1]
plan.targetScore = plan.score(plan.target.GetID())
if plan.targetScore > plan.averageScore {
break
}
regionID := plan.region.GetID()
sourceID := plan.source.GetID()
targetID := plan.target.GetID()
if !plan.shouldBalance(s.GetName()) {
continue
}
log.Debug("candidate store", zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID))

oldPeer := plan.region.GetStorePeer(sourceID)
newPeer := &metapb.Peer{StoreId: plan.target.GetID(), Role: oldPeer.Role}
op, err := operator.CreateMovePeerOperator(s.GetName(), plan, plan.region, operator.OpRange, oldPeer.GetStoreId(), newPeer)
if err != nil {
balanceRangeCreateOpFailCounter.Inc()
return nil
}
sourceLabel := strconv.FormatUint(sourceID, 10)
targetLabel := strconv.FormatUint(targetID, 10)
op.FinishedCounters = append(op.FinishedCounters,
balanceDirectionCounter.WithLabelValues(s.GetName(), sourceLabel, targetLabel),
)
op.SetAdditionalInfo("sourceScore", strconv.FormatInt(plan.sourceScore, 10))
op.SetAdditionalInfo("targetScore", strconv.FormatInt(plan.targetScore, 10))
return op
}
balanceRangeNoReplacementCounter.Inc()
return nil
}

// balanceRangeSchedulerPlan is used to record the plan of balance key range scheduler.
type balanceRangeSchedulerPlan struct {
sche.SchedulerCluster
// stores is sorted by score desc
stores []*storeInfo
source *storeInfo
target *storeInfo
region *core.RegionInfo
fit *placement.RegionFit
stores []*core.StoreInfo
// sourceMap records the storeID -> score
sourceMap map[uint64]int64
source *core.StoreInfo
sourceScore int64
target *core.StoreInfo
targetScore int64
region *core.RegionInfo
fit *placement.RegionFit
averageScore int64
}

type storeInfo struct {
store *core.StoreInfo
score uint64
score int64
}

func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster)(*balanceRangeSchedulerPlan,error) {
func (s *balanceRangeScheduler) prepare(cluster sche.SchedulerCluster, opInfluence operator.OpInfluence) (*balanceRangeSchedulerPlan, error) {
krs := core.NewKeyRanges(s.conf.Ranges)
scanRegions, err := cluster.BatchScanRegions(krs)
if err != nil {
return nil,err
return nil, err
}
sources := filter.SelectSourceStores(cluster.GetStores(), s.filters, cluster.GetSchedulerConfig(), nil, nil)
storeInfos:=make(map[uint64]*storeInfo,len(sources))
storeInfos := make(map[uint64]*storeInfo, len(sources))
for _, source := range sources {
storeInfos[source.GetID()] = &storeInfo{store: source}
}
totalScore := int64(0)
for _, region := range scanRegions {
for _, peer := range s.role.getPeers(region) {
storeInfos[peer.GetStoreId()].score += 1
totalScore += 1
}
}

stores:=make([]*storeInfo,0,len(storeInfos))
for _, store := range storeInfos {
stores = append(stores, store)
storeList := make([]*storeInfo, 0, len(storeInfos))
for storeID, store := range storeInfos {
if influence := opInfluence.GetStoreInfluence(storeID); influence != nil {
store.score += s.role.getStoreInfluence(influence)
}
storeList = append(storeList, store)
}
sort.Slice(stores, func(i, j int) bool {
return stores[i].score > stores[j].score
sort.Slice(storeList, func(i, j int) bool {
return storeList[i].score > storeList[j].score
})
sourceMap := make(map[uint64]int64)
for _, store := range storeList {
sourceMap[store.store.GetID()] = store.score
}

stores := make([]*core.StoreInfo, 0, len(storeList))
for _, store := range storeList {
stores = append(stores, store.store)
}
averageScore := totalScore / int64(len(storeList))
return &balanceRangeSchedulerPlan{
stores:stores,
source: nil,
target: nil,
region: nil,
},nil
SchedulerCluster: cluster,
stores: stores,
sourceMap: sourceMap,
source: nil,
target: nil,
region: nil,
averageScore: averageScore,
}, nil
}

func (p *balanceRangeSchedulerPlan) sourceStoreID() uint64 {
return p.source.store.GetID()
return p.source.GetID()
}

func (p *balanceRangeSchedulerPlan) targetStoreID() uint64 {
return p.target.store.GetID()
return p.target.GetID()
}

func (p *balanceRangeSchedulerPlan) score(storeID uint64) int64 {
return p.sourceMap[storeID]
}

func (p *balanceRangeSchedulerPlan) shouldBalance(scheduler string) bool {
sourceScore := p.score(p.sourceStoreID())
targetScore := p.score(p.targetStoreID())
shouldBalance := sourceScore > targetScore
if !shouldBalance && log.GetLevel() <= zap.DebugLevel {
log.Debug("skip balance ",
zap.String("scheduler", scheduler),
zap.Uint64("region-id", p.region.GetID()),
zap.Uint64("source-store", p.sourceStoreID()),
zap.Uint64("target-store", p.targetStoreID()),
zap.Int64("source-score", p.sourceScore),
zap.Int64("target-score", p.targetScore),
zap.Int64("average-region-size", p.averageScore),
)
}
return shouldBalance
}

type Role int

Check failure on line 383 in pkg/schedule/schedulers/balance_range.go

View workflow job for this annotation

GitHub Actions / statics

exported: exported type Role should have comment or be unexported (revive)

Expand All @@ -310,33 +403,33 @@ func (r Role) String() string {
}
}

func NewRole(role string) Role {
switch role {
case "leader":
return Leader
case "follower":
return Follower
case "learner":
return Learner
func (r Role) getPeers(region *core.RegionInfo) []*metapb.Peer {
switch r {
case Leader:
return []*metapb.Peer{region.GetLeader()}
case Follower:
followers := region.GetFollowers()
ret := make([]*metapb.Peer, len(followers))
for _, peer := range followers {
ret = append(ret, peer)

Check failure on line 414 in pkg/schedule/schedulers/balance_range.go

View workflow job for this annotation

GitHub Actions / statics

append to slice `ret` with non-zero initialized length (makezero)
}
return ret
case Learner:
return region.GetLearners()
default:
return Unknown
return nil
}
}

func (r Role) getPeers(region *core.RegionInfo) []*metapb.Peer {{
func (r Role) getStoreInfluence(influence *operator.StoreInfluence) int64 {
switch r {
case Leader:
return []*metapb.Peer{region.GetLeader()}
return influence.LeaderCount
case Follower:
followers:=region.GetFollowers()
ret:=make([]*metapb.Peer,len(followers))
for _,peer:=range followers{
ret=append(ret,peer)
}
return ret
return influence.RegionCount
case Learner:
return region.GetLearners()
return influence.RegionCount
default:
return nil
return 0
}
}
Loading

0 comments on commit 66f70c2

Please sign in to comment.