Skip to content

Commit

Permalink
fix data race of tests
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Nov 20, 2023
1 parent 9845c12 commit f2f8b84
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 45 deletions.
8 changes: 4 additions & 4 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,15 @@ type Status struct {
}

// NewRaftCluster create a new cluster.
func NewRaftCluster(ctx context.Context, clusterID uint64, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client) *RaftCluster {
return &RaftCluster{
serverCtx: ctx,
clusterID: clusterID,
regionSyncer: regionSyncer,
httpClient: httpClient,
etcdClient: etcdClient,
core: basicCluster,
}
}

Expand Down Expand Up @@ -259,10 +260,9 @@ func (c *RaftCluster) InitCluster(
id id.Allocator,
opt sc.ConfProvider,
storage storage.Storage,
basicCluster *core.BasicCluster,
hbstreams *hbstream.HeartbeatStreams,
keyspaceGroupManager *keyspace.GroupManager) error {
c.core, c.opt, c.storage, c.id = basicCluster, opt.(*config.PersistOptions), storage, id
c.opt, c.storage, c.id = opt.(*config.PersistOptions), storage, id
c.ctx, c.cancel = context.WithCancel(c.serverCtx)
c.progressManager = progress.NewManager()
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
Expand Down Expand Up @@ -292,7 +292,7 @@ func (c *RaftCluster) Start(s Server) error {
}

c.isAPIServiceMode = s.IsAPIServiceMode()
err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
}
Expand Down
61 changes: 30 additions & 31 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestStoreHeartbeat(t *testing.T) {
_, opt, err := newTestScheduleConfig()
opt.GetScheduleConfig().StoreLimitVersion = "v2"
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())

n, np := uint64(3), uint64(3)
stores := newTestStores(n, "2.0.0")
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestFilterUnhealthyStore(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())

stores := newTestStores(3, "2.0.0")
req := &pdpb.StoreHeartbeatRequest{}
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestSetOfflineStore(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
if opt.IsPlacementRulesEnabled() {
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestSetOfflineWithReplica(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)

// Put 4 stores.
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestSetOfflineStoreWithEvictLeader(t *testing.T) {
_, opt, err := newTestScheduleConfig()
re.NoError(err)
opt.SetMaxReplicas(1)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)

// Put 3 stores.
Expand All @@ -371,7 +371,7 @@ func TestForceBuryStore(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
// Put 2 stores.
stores := newTestStores(2, "5.3.0")
stores[1] = stores[1].Clone(core.SetLastHeartbeatTS(time.Now()))
Expand All @@ -390,7 +390,7 @@ func TestReuseAddress(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
// Put 4 stores.
for _, store := range newTestStores(4, "2.0.0") {
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestUpStore(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
if opt.IsPlacementRulesEnabled() {
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestRemovingProcess(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.SetPrepared()

Expand Down Expand Up @@ -539,7 +539,7 @@ func TestDeleteStoreUpdatesClusterVersion(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
if opt.IsPlacementRulesEnabled() {
Expand Down Expand Up @@ -574,7 +574,7 @@ func TestStoreClusterVersion(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
stores := newTestStores(3, "5.0.0")
s1, s2, s3 := stores[0].GetMeta(), stores[1].GetMeta(), stores[2].GetMeta()
s1.Version = "5.0.1"
Expand All @@ -599,7 +599,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
newTestStores(4, "2.0.0")
peers := []*metapb.Peer{
Expand Down Expand Up @@ -661,7 +661,7 @@ func TestBucketHeartbeat(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)

// case1: region is not exist
Expand Down Expand Up @@ -718,7 +718,7 @@ func TestRegionHeartbeat(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
n, np := uint64(3), uint64(3)
cluster.wg.Add(1)
Expand Down Expand Up @@ -963,7 +963,7 @@ func TestRegionFlowChanged(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
regions := []*core.RegionInfo{core.NewTestRegionInfo(1, 1, []byte{}, []byte{})}
processRegions := func(regions []*core.RegionInfo) {
Expand All @@ -988,7 +988,7 @@ func TestRegionSizeChanged(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.regionStats = statistics.NewRegionStatistics(
cluster.GetBasicCluster(),
Expand Down Expand Up @@ -1034,7 +1034,7 @@ func TestConcurrentReportBucket(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)

regions := []*core.RegionInfo{core.NewTestRegionInfo(1, 1, []byte{}, []byte{})}
Expand Down Expand Up @@ -1064,7 +1064,7 @@ func TestConcurrentRegionHeartbeat(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)

regions := []*core.RegionInfo{core.NewTestRegionInfo(1, 1, []byte{}, []byte{})}
Expand Down Expand Up @@ -1105,7 +1105,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) {
cfg.LocationLabels = []string{"zone"}
opt.SetReplicationConfig(cfg)
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())

for i := uint64(1); i <= 4; i++ {
var labels []*metapb.StoreLabel
Expand Down Expand Up @@ -1184,7 +1184,7 @@ func TestHeartbeatSplit(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)

// 1: [nil, nil)
Expand Down Expand Up @@ -1228,7 +1228,7 @@ func TestRegionSplitAndMerge(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)

regions := []*core.RegionInfo{core.NewTestRegionInfo(1, 1, []byte{}, []byte{})}
Expand Down Expand Up @@ -1266,7 +1266,7 @@ func TestOfflineAndMerge(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts())
if opt.IsPlacementRulesEnabled() {
Expand Down Expand Up @@ -1634,7 +1634,7 @@ func TestCalculateStoreSize1(t *testing.T) {
cfg := opt.GetReplicationConfig()
cfg.EnablePlacementRules = true
opt.SetReplicationConfig(cfg)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.regionStats = statistics.NewRegionStatistics(
cluster.GetBasicCluster(),
Expand Down Expand Up @@ -1720,7 +1720,7 @@ func TestCalculateStoreSize2(t *testing.T) {
cfg.EnablePlacementRules = true
opt.SetReplicationConfig(cfg)
opt.SetMaxReplicas(3)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)
cluster.regionStats = statistics.NewRegionStatistics(
cluster.GetBasicCluster(),
Expand Down Expand Up @@ -1829,7 +1829,7 @@ func Test(t *testing.T) {
regions := newTestRegions(n, n, np)
_, opts, err := newTestScheduleConfig()
re.NoError(err)
tc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opts, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
tc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opts, storage.NewStorageWithMemoryBackend())
cache := tc.core

for i := uint64(0); i < n; i++ {
Expand Down Expand Up @@ -1943,7 +1943,7 @@ func TestAwakenStore(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
n := uint64(3)
stores := newTestStores(n, "6.5.0")
re.True(stores[0].NeedAwakenStore())
Expand Down Expand Up @@ -1997,7 +1997,7 @@ func TestUpdateAndDeleteLabel(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
stores := newTestStores(1, "6.5.1")
for _, store := range stores {
re.NoError(cluster.PutStore(store.GetMeta()))
Expand Down Expand Up @@ -2115,7 +2115,7 @@ func newTestScheduleConfig() (*sc.ScheduleConfig, *config.PersistOptions, error)
}

func newTestCluster(ctx context.Context, opt *config.PersistOptions) *testCluster {
rc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
rc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
storage := storage.NewStorageWithMemoryBackend()
rc.regionLabeler, _ = labeler.NewRegionLabeler(ctx, storage, time.Second*5)

Expand All @@ -2127,10 +2127,9 @@ func newTestRaftCluster(
id id.Allocator,
opt *config.PersistOptions,
s storage.Storage,
basicCluster *core.BasicCluster,
) *RaftCluster {
rc := &RaftCluster{serverCtx: ctx}
rc.InitCluster(id, opt, s, basicCluster, nil, nil)
rc := &RaftCluster{serverCtx: ctx, core: core.NewBasicCluster()}
rc.InitCluster(id, opt, s, nil, nil)
rc.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), rc, opt)
if opt.IsPlacementRulesEnabled() {
err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel())
Expand Down
5 changes: 2 additions & 3 deletions server/cluster/cluster_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage"
)
Expand All @@ -33,7 +32,7 @@ func TestReportSplit(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
left := &metapb.Region{Id: 1, StartKey: []byte("a"), EndKey: []byte("b")}
right := &metapb.Region{Id: 2, StartKey: []byte("b"), EndKey: []byte("c")}
_, err = cluster.HandleReportSplit(&pdpb.ReportSplitRequest{Left: left, Right: right})
Expand All @@ -49,7 +48,7 @@ func TestReportBatchSplit(t *testing.T) {

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
regions := []*metapb.Region{
{Id: 1, StartKey: []byte(""), EndKey: []byte("a")},
{Id: 2, StartKey: []byte("a"), EndKey: []byte("b")},
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func (s *Server) startServer(ctx context.Context) error {

s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, s.clusterID, syncer.NewRegionSyncer(s), s.client, s.httpClient)
s.cluster = cluster.NewRaftCluster(ctx, s.clusterID, s.GetBasicCluster(), syncer.NewRegionSyncer(s), s.client, s.httpClient)
keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{
Client: s.client,
RootPath: s.rootPath,
Expand Down
Loading

0 comments on commit f2f8b84

Please sign in to comment.