Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedulers: add reload config function #7406

Merged
merged 8 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ var (
)

type balanceLeaderSchedulerConfig struct {
mu syncutil.RWMutex
syncutil.RWMutex
storage endpoint.ConfigStorage
Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
}

func (conf *balanceLeaderSchedulerConfig) Update(data []byte) (int, interface{}) {
conf.mu.Lock()
defer conf.mu.Unlock()
conf.Lock()
defer conf.Unlock()

oldc, _ := json.Marshal(conf)

Expand Down Expand Up @@ -109,8 +109,8 @@ func (conf *balanceLeaderSchedulerConfig) validate() bool {
}

func (conf *balanceLeaderSchedulerConfig) Clone() *balanceLeaderSchedulerConfig {
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
ranges := make([]core.KeyRange, len(conf.Ranges))
copy(ranges, conf.Ranges)
return &balanceLeaderSchedulerConfig{
Expand Down Expand Up @@ -210,14 +210,14 @@ func (l *balanceLeaderScheduler) GetType() string {
}

func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) {
l.conf.mu.RLock()
defer l.conf.mu.RUnlock()
l.conf.RLock()
defer l.conf.RUnlock()
return EncodeConfig(l.conf)
}

func (l *balanceLeaderScheduler) ReloadConfig() error {
l.conf.mu.Lock()
defer l.conf.mu.Unlock()
l.conf.Lock()
defer l.conf.Unlock()
cfgData, err := l.conf.storage.LoadSchedulerConfig(l.GetName())
if err != nil {
return err
Expand Down Expand Up @@ -335,8 +335,8 @@ func (cs *candidateStores) resortStoreWithPos(pos int) {
}

func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
l.conf.mu.RLock()
defer l.conf.mu.RUnlock()
l.conf.RLock()
defer l.conf.RUnlock()
basePlan := plan.NewBalanceSchedulerPlan()
var collector *plan.Collector
if dryRun {
Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ var (
type balanceRegionSchedulerConfig struct {
Name string `json:"name"`
Ranges []core.KeyRange `json:"ranges"`
// TODO: whether to support reload it?
Copy link
Contributor Author

@lhy1024 lhy1024 Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to talk about the schedulers whose config only contains Ranges. Do we need to support ReloadConfig for them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion, we will keep TODO for these schedulers.

// Maybe we need to ensure whether scatter-range-scheduler?
}

type balanceRegionScheduler struct {
Expand Down
22 changes: 11 additions & 11 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ const (
)

type balanceWitnessSchedulerConfig struct {
mu syncutil.RWMutex
syncutil.RWMutex
storage endpoint.ConfigStorage
Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
}

func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, interface{}) {
conf.mu.Lock()
defer conf.mu.Unlock()
conf.Lock()
defer conf.Unlock()

oldc, _ := json.Marshal(conf)

Expand Down Expand Up @@ -95,8 +95,8 @@ func (conf *balanceWitnessSchedulerConfig) validate() bool {
}

func (conf *balanceWitnessSchedulerConfig) Clone() *balanceWitnessSchedulerConfig {
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
ranges := make([]core.KeyRange, len(conf.Ranges))
copy(ranges, conf.Ranges)
return &balanceWitnessSchedulerConfig{
Expand Down Expand Up @@ -205,14 +205,14 @@ func (b *balanceWitnessScheduler) GetType() string {
}

func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) {
b.conf.mu.RLock()
defer b.conf.mu.RUnlock()
b.conf.RLock()
defer b.conf.RUnlock()
return EncodeConfig(b.conf)
}

func (b *balanceWitnessScheduler) ReloadConfig() error {
b.conf.mu.Lock()
defer b.conf.mu.Unlock()
b.conf.Lock()
defer b.conf.Unlock()
cfgData, err := b.conf.storage.LoadSchedulerConfig(b.GetName())
if err != nil {
return err
Expand All @@ -238,8 +238,8 @@ func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.SchedulerCluste
}

func (b *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
b.conf.mu.RLock()
defer b.conf.mu.RUnlock()
b.conf.RLock()
defer b.conf.RUnlock()
basePlan := plan.NewBalanceSchedulerPlan()
var collector *plan.Collector
if dryRun {
Expand Down
72 changes: 28 additions & 44 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@
)

type evictLeaderSchedulerConfig struct {
mu syncutil.RWMutex
syncutil.RWMutex
storage endpoint.ConfigStorage
StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"`
cluster *core.BasicCluster
removeSchedulerCb func(string) error
}

func (conf *evictLeaderSchedulerConfig) getStores() []uint64 {
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
stores := make([]uint64, 0, len(conf.StoreIDWithRanges))
for storeID := range conf.StoreIDWithRanges {
stores = append(stores, storeID)
Expand All @@ -86,15 +86,15 @@
if err != nil {
return err
}
conf.mu.Lock()
defer conf.mu.Unlock()
conf.Lock()
defer conf.Unlock()
conf.StoreIDWithRanges[id] = ranges
return nil
}

func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig {
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
storeIDWithRanges := make(map[uint64][]core.KeyRange)
for id, ranges := range conf.StoreIDWithRanges {
storeIDWithRanges[id] = append(storeIDWithRanges[id], ranges...)
Expand All @@ -106,8 +106,8 @@

func (conf *evictLeaderSchedulerConfig) Persist() error {
name := conf.getSchedulerName()
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
data, err := EncodeConfig(conf)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
Expand All @@ -123,8 +123,8 @@
}

func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
ranges := conf.StoreIDWithRanges[id]
res := make([]string, 0, len(ranges)*2)
for index := range ranges {
Expand All @@ -134,8 +134,8 @@
}

func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last bool) {
conf.mu.Lock()
defer conf.mu.Unlock()
conf.Lock()
defer conf.Unlock()
_, exists := conf.StoreIDWithRanges[id]
succ, last = false, false
if exists {
Expand All @@ -148,15 +148,15 @@
}

func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) {
conf.mu.Lock()
defer conf.mu.Unlock()
conf.Lock()
defer conf.Unlock()
conf.cluster.PauseLeaderTransfer(id)
conf.StoreIDWithRanges[id] = keyRange
}

func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange {
conf.mu.RLock()
defer conf.mu.RUnlock()
conf.RLock()
defer conf.RUnlock()
if ranges, exist := conf.StoreIDWithRanges[id]; exist {
return ranges
}
Expand Down Expand Up @@ -199,14 +199,14 @@
}

func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
s.conf.RLock()
defer s.conf.RUnlock()
return EncodeConfig(s.conf)
}

func (s *evictLeaderScheduler) ReloadConfig() error {
s.conf.mu.Lock()
defer s.conf.mu.Unlock()
s.conf.Lock()
defer s.conf.Unlock()
cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName())
if err != nil {
return err
Expand All @@ -223,25 +223,9 @@
return nil
}

// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer.
func pauseAndResumeLeaderTransfer(cluster *core.BasicCluster, old, new map[uint64][]core.KeyRange) {
for id := range old {
if _, ok := new[id]; ok {
continue
}
cluster.ResumeLeaderTransfer(id)
}
for id := range new {
if _, ok := old[id]; ok {
continue
}
cluster.PauseLeaderTransfer(id)
}
}

func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
s.conf.RLock()
defer s.conf.RUnlock()
var res error
for id := range s.conf.StoreIDWithRanges {
if err := cluster.PauseLeaderTransfer(id); err != nil {
Expand All @@ -252,8 +236,8 @@
}

func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
s.conf.RLock()
defer s.conf.RUnlock()
for id := range s.conf.StoreIDWithRanges {
cluster.ResumeLeaderTransfer(id)
}
Expand Down Expand Up @@ -382,15 +366,15 @@
idFloat, ok := input["store_id"].(float64)
if ok {
id = (uint64)(idFloat)
handler.config.mu.RLock()
handler.config.RLock()
if _, exists = handler.config.StoreIDWithRanges[id]; !exists {
if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil {
handler.config.mu.RUnlock()
handler.config.RUnlock()

Check warning on line 372 in pkg/schedule/schedulers/evict_leader.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/evict_leader.go#L372

Added line #L372 was not covered by tests
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}
handler.config.mu.RUnlock()
handler.config.RUnlock()
args = append(args, strconv.FormatUint(id, 10))
}

Expand Down
Loading
Loading