Skip to content

Commit

Permalink
replication_mode: fix the state cannot switch to async while existing…
Browse files Browse the repository at this point in the history
… learner node (#6452) (#6483)

ref #4399, ref #6452, close tikv/tikv#14704

replication_mode: fix the state cannot switch to async while existing learner node
 - skip the learner node when check the stores state

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: nolouch <[email protected]>

Co-authored-by: ShuNing <[email protected]>
Co-authored-by: nolouch <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored May 24, 2023
1 parent d086bd5 commit cb52d28
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 10 deletions.
7 changes: 7 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[st
mc.PutStore(store)
}

// AddLabersStoreWithLearnerCount adds store with specified count of region, learner and labels.
func (mc *Cluster) AddLabersStoreWithLearnerCount(storeID uint64, regionCount int, learnerCount int, labels map[string]string) {
mc.AddLabelsStore(storeID, regionCount, labels)
store := mc.GetStore(storeID).Clone(core.SetLearnerCount(learnerCount))
mc.PutStore(store)
}

// AddLeaderRegion adds region with specified leader and followers.
func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderStoreID uint64, otherPeerStoreIDs ...uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
Expand Down
6 changes: 4 additions & 2 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type StoreStatus struct {
RegionWeight float64 `json:"region_weight"`
RegionScore float64 `json:"region_score"`
RegionSize int64 `json:"region_size"`
WitnessCount int `json:"witness_count"`
SlowScore uint64 `json:"slow_score"`
LearnerCount int `json:"learner_count,omitempty"`
WitnessCount int `json:"witness_count,omitempty"`
SlowScore uint64 `json:"slow_score,omitempty"`
SendingSnapCount uint32 `json:"sending_snap_count,omitempty"`
ReceivingSnapCount uint32 `json:"receiving_snap_count,omitempty"`
IsBusy bool `json:"is_busy,omitempty"`
Expand Down Expand Up @@ -94,6 +95,7 @@ func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo
RegionWeight: store.GetRegionWeight(),
RegionScore: store.RegionScore(opt.RegionScoreFormulaVersion, opt.HighSpaceRatio, opt.LowSpaceRatio, 0),
RegionSize: store.GetRegionSize(),
LearnerCount: store.GetLearnerCount(),
WitnessCount: store.GetWitnessCount(),
SlowScore: store.GetSlowScore(),
SendingSnapCount: store.GetSendingSnapCount(),
Expand Down
4 changes: 2 additions & 2 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Typ

// UpdateStoreStatus updates the information of the store.
func (bc *BasicCluster) UpdateStoreStatus(storeID uint64) {
leaderCount, regionCount, witnessCount, pendingPeerCount, leaderRegionSize, regionSize := bc.RegionsInfo.GetStoreStats(storeID)
leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize := bc.RegionsInfo.GetStoreStats(storeID)
bc.Stores.mu.Lock()
defer bc.Stores.mu.Unlock()
bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize, witnessCount)
bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount, leaderRegionSize, regionSize)
}

// PutStore put a store.
Expand Down
4 changes: 2 additions & 2 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,11 +1171,11 @@ func (r *RegionsInfo) GetMetaRegions() []*metapb.Region {
}

// GetStoreStats returns the store stats.
func (r *RegionsInfo) GetStoreStats(storeID uint64) (leader, region, witness, pending int, leaderSize, regionSize int64) {
func (r *RegionsInfo) GetStoreStats(storeID uint64) (leader, region, witness, learner, pending int, leaderSize, regionSize int64) {
r.st.RLock()
defer r.st.RUnlock()
return r.leaders[storeID].length(), r.getStoreRegionCountLocked(storeID), r.witnesses[storeID].length(),
r.pendingPeers[storeID].length(), r.leaders[storeID].TotalSize(), r.getStoreRegionSizeLocked(storeID)
r.learners[storeID].length(), r.pendingPeers[storeID].length(), r.leaders[storeID].TotalSize(), r.getStoreRegionSizeLocked(storeID)
}

// GetRegionCount gets the total count of RegionInfo of regionMap
Expand Down
9 changes: 8 additions & 1 deletion server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type StoreInfo struct {
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
Expand Down Expand Up @@ -221,6 +222,11 @@ func (s *StoreInfo) GetRegionCount() int {
return s.regionCount
}

// GetLearnerCount returns the learner count of the store.
func (s *StoreInfo) GetLearnerCount() int {
return s.learnerCount
}

// GetWitnessCount returns the witness count of the store.
func (s *StoreInfo) GetWitnessCount() int {
return s.witnessCount
Expand Down Expand Up @@ -709,11 +715,12 @@ func (s *StoresInfo) SetRegionSize(storeID uint64, regionSize int64) {
}

// UpdateStoreStatus updates the information of the store.
func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount int, regionCount int, pendingPeerCount int, leaderSize int64, regionSize int64, witnessCount int) {
func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount, regionCount, witnessCount, learnerCount, pendingPeerCount int, leaderSize int64, regionSize int64) {
if store, ok := s.stores[storeID]; ok {
newStore := store.ShallowClone(SetLeaderCount(leaderCount),
SetRegionCount(regionCount),
SetWitnessCount(witnessCount),
SetLearnerCount(learnerCount),
SetPendingPeerCount(pendingPeerCount),
SetLeaderSize(leaderSize),
SetRegionSize(regionSize))
Expand Down
7 changes: 7 additions & 0 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ func SetRegionCount(regionCount int) StoreCreateOption {
}
}

// SetLearnerCount sets the learner count for the store.
func SetLearnerCount(learnerCount int) StoreCreateOption {
return func(store *StoreInfo) {
store.learnerCount = learnerCount
}
}

// SetWitnessCount sets the witness count for the store.
func SetWitnessCount(witnessCount int) StoreCreateOption {
return func(store *StoreInfo) {
Expand Down
4 changes: 4 additions & 0 deletions server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,10 @@ func (m *ModeManager) checkStoreStatus() [][]uint64 {
if s.IsRemoved() {
continue
}
// learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store.
if s.GetRegionCount() == s.GetLearnerCount() {
continue
}
down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
if labelValue == m.config.DRAutoSync.Primary {
Expand Down
10 changes: 7 additions & 3 deletions server/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestStateSwitch(t *testing.T) {
Primary: "zone1",
DR: "zone2",
PrimaryReplicas: 4,
DRReplicas: 1,
DRReplicas: 2,
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(ctx, config.NewTestOptions())
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestStateSwitch(t *testing.T) {

// add new store in dr zone.
cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"})
cluster.AddLabelsStore(6, 1, map[string]string{"zone": "zone2"})
cluster.AddLabersStoreWithLearnerCount(6, 1, 1, map[string]string{"zone": "zone2"})
// async -> sync
rep.tickDR()
re.Equal(drStateSyncRecover, rep.drGetState())
Expand All @@ -233,10 +233,14 @@ func TestStateSwitch(t *testing.T) {
rep.tickDR()
re.Equal(drStateSync, rep.drGetState()) // cannot guarantee majority, keep sync.

setStoreState(cluster, "up", "up", "up", "up", "up", "down")
rep.tickDR()
re.Equal(drStateSync, rep.drGetState())

// once the voter node down, even learner node up, swith to async state.
setStoreState(cluster, "up", "up", "up", "up", "down", "up")
rep.tickDR()
re.Equal(drStateAsyncWait, rep.drGetState())
assertStateIDUpdate()

rep.drSwitchToSync()
replicator.errors[2] = errors.New("fail to replicate")
Expand Down
4 changes: 4 additions & 0 deletions server/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type storeStatistics struct {
StorageCapacity uint64
RegionCount int
LeaderCount int
LearnerCount int
WitnessCount int
LabelCounter map[string]int
Preparing int
Expand Down Expand Up @@ -119,6 +120,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
storeStatusGauge.WithLabelValues(storeAddress, id, "leader_size").Set(float64(store.GetLeaderSize()))
storeStatusGauge.WithLabelValues(storeAddress, id, "leader_count").Set(float64(store.GetLeaderCount()))
storeStatusGauge.WithLabelValues(storeAddress, id, "witness_count").Set(float64(store.GetWitnessCount()))
storeStatusGauge.WithLabelValues(storeAddress, id, "learner_count").Set(float64(store.GetLearnerCount()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_available").Set(float64(store.GetAvailable()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_used").Set(float64(store.GetUsedSize()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_capacity").Set(float64(store.GetCapacity()))
Expand Down Expand Up @@ -170,6 +172,7 @@ func (s *storeStatistics) Collect() {
metrics["region_count"] = float64(s.RegionCount)
metrics["leader_count"] = float64(s.LeaderCount)
metrics["witness_count"] = float64(s.WitnessCount)
metrics["learner_count"] = float64(s.LearnerCount)
metrics["storage_size"] = float64(s.StorageSize)
metrics["storage_capacity"] = float64(s.StorageCapacity)

Expand Down Expand Up @@ -241,6 +244,7 @@ func (s *storeStatistics) resetStoreStatistics(storeAddress string, id string) {
"leader_size",
"leader_count",
"witness_count",
"learner_count",
"store_available",
"store_used",
"store_capacity",
Expand Down

0 comments on commit cb52d28

Please sign in to comment.