diff --git a/pkg/core/store_option.go b/pkg/core/store_option.go index 8a2aa1ef089..0bdfaffa44f 100644 --- a/pkg/core/store_option.go +++ b/pkg/core/store_option.go @@ -274,3 +274,23 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption { store.lastAwakenTime = lastAwaken } } + +// SetStoreMeta sets the meta for the store. +func SetStoreMeta(newMeta *metapb.Store) StoreCreateOption { + return func(store *StoreInfo) { + meta := typeutil.DeepClone(store.meta, StoreFactory) + meta.Version = newMeta.GetVersion() + meta.GitHash = newMeta.GetGitHash() + meta.Address = newMeta.GetAddress() + meta.StatusAddress = newMeta.GetStatusAddress() + meta.PeerAddress = newMeta.GetPeerAddress() + meta.StartTimestamp = newMeta.GetStartTimestamp() + meta.DeployPath = newMeta.GetDeployPath() + meta.LastHeartbeat = newMeta.GetLastHeartbeat() + meta.State = newMeta.GetState() + meta.Labels = newMeta.GetLabels() + meta.NodeState = newMeta.GetNodeState() + meta.PhysicallyDestroyed = newMeta.GetPhysicallyDestroyed() + store.meta = meta + } +} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index ac15212553b..96ee88259da 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -381,8 +381,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq return errors.Errorf("store %v not found", storeID) } - nowTime := time.Now() - newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime)) + newStore := store.Clone(core.SetStoreStats(stats)) if store := c.GetStore(storeID); store != nil { statistics.UpdateStoreHeartbeatMetrics(store) diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 3dbd0fc8c92..3a04c261163 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -81,7 +81,7 @@ func (w *Watcher) initializeStoreWatcher() error { w.basicCluster.PutStore(core.NewStoreInfo(store)) return nil } - w.basicCluster.PutStore(origin.Clone(core.SetStoreState(store.GetState(), store.GetPhysicallyDestroyed()))) + w.basicCluster.PutStore(origin.Clone(core.SetStoreMeta(store))) return nil } deleteFn := func(kv *mvccpb.KeyValue) error { diff --git a/tests/integrations/mcs/scheduling/meta_test.go b/tests/integrations/mcs/scheduling/meta_test.go index 74497e0b552..ce0dc620aef 100644 --- a/tests/integrations/mcs/scheduling/meta_test.go +++ b/tests/integrations/mcs/scheduling/meta_test.go @@ -99,4 +99,15 @@ func (suite *metaTestSuite) TestStoreWatch() { testutil.Eventually(re, func() bool { return cluster.GetStore(2) == nil }) + + // test synchronized store labels + suite.pdLeaderServer.GetServer().GetRaftCluster().PutStore( + &metapb.Store{Id: 5, Address: "mock-5", State: metapb.StoreState_Up, NodeState: metapb.NodeState_Serving, LastHeartbeat: time.Now().UnixNano(), Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}}}, + ) + testutil.Eventually(re, func() bool { + if len(cluster.GetStore(5).GetLabels()) == 0 { + return false + } + return cluster.GetStore(5).GetLabels()[0].GetValue() == "z1" + }) } diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 41c00b8e9b4..eb99411d27e 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -59,7 +59,7 @@ func TestServerTestSuite(t *testing.T) { func (suite *serverTestSuite) SetupSuite() { var err error re := suite.Require() - + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 3) re.NoError(err) @@ -76,6 +76,7 @@ func (suite *serverTestSuite) SetupSuite() { func (suite *serverTestSuite) TearDownSuite() { suite.cluster.Destroy() suite.cancel() + suite.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) } func (suite *serverTestSuite) TestAllocID() { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index ccb469c04cb..6d233a8c8ab 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -510,6 +510,7 @@ func TestRaftClusterMultipleRestart(t *testing.T) { err = rc.PutStore(store) re.NoError(err) re.NotNil(tc) + rc.Stop() // let the job run at small interval re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))