From 84f7f243346a806eb528e7c4e98ec51c3e35fd93 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 20 Mar 2023 18:44:44 +0800 Subject: [PATCH 1/4] add more tso tests Signed-off-by: Ryan Leung --- client/pd_service_discovery.go | 3 + pkg/tso/allocator_manager.go | 4 + tests/cluster.go | 14 ++ tests/integrations/mcs/testutil.go | 11 +- tests/integrations/mcs/tso/server_test.go | 2 +- tests/integrations/tso/client_test.go | 199 ++++++++++++++++++++++ 6 files changed, 230 insertions(+), 3 deletions(-) diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 73074943304..e39ff7f9c50 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -223,6 +223,9 @@ func (c *pdServiceDiscovery) updateMemberLoop() { } func (c *pdServiceDiscovery) updateServiceModeLoop() { + failpoint.Inject("skipUpdateServiceMode", func() { + failpoint.Return() + }) defer c.wg.Done() ctx, cancel := context.WithCancel(c.ctx) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index a3de87311bb..f73da9546b5 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -610,6 +610,10 @@ func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) { defer patrolTicker.Stop() } tsTicker := time.NewTicker(am.updatePhysicalInterval) + failpoint.Inject("fastUpdatePhysicalInterval", func() { + tsTicker.Stop() + tsTicker = time.NewTicker(time.Millisecond) + }) defer tsTicker.Stop() checkerTicker := time.NewTicker(PriorityCheck) defer checkerTicker.Stop() diff --git a/tests/cluster.go b/tests/cluster.go index dbb15c199b8..1f27e72bcad 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -717,6 +717,20 @@ func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServ return s, nil } +// JoinAPIServer is used to add a new TestAPIServer into the cluster. +func (c *TestCluster) JoinAPIServer(ctx context.Context, opts ...ConfigOption) (*TestServer, error) { + conf, err := c.config.Join().Generate(opts...) + if err != nil { + return nil, err + } + s, err := NewTestAPIServer(ctx, conf) + if err != nil { + return nil, err + } + c.servers[conf.Name] = s + return s, nil +} + // Destroy is used to destroy a TestCluster. func (c *TestCluster) Destroy() { for _, s := range c.servers { diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index fdd4f5a1ed2..b277b9158cb 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -29,13 +29,20 @@ import ( bs "github.com/tikv/pd/pkg/basicserver" ) -// SetupTSOClient creates a TSO client for test. -func SetupTSOClient(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client { +// SetupClientWithKeyspace creates a TSO client for test. +func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client { cli, err := pd.NewClientWithKeyspace(ctx, utils.DefaultKeyspaceID, endpoints, pd.SecurityOption{}, opts...) re.NoError(err) return cli } +// SetupClient creates a TSO client for test. +func SetupClient(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client { + cli, err := pd.NewClientWithContext(ctx, endpoints, pd.SecurityOption{}, opts...) + re.NoError(err) + return cli +} + // StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing. func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) { cfg := rm.NewConfig() diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 882ef3db56a..66e015f7150 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -154,7 +154,7 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) { _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) defer cleanup() - cli := mcs.SetupTSOClient(ctx, re, []string{backendEndpoints}) + cli := mcs.SetupClientWithKeyspace(ctx, re, []string{backendEndpoints}) physical, logical, err := cli.GetTS(ctx) re.NoError(err) ts := tsoutil.ComposeTS(physical, logical) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index a79ed60b66d..584ee8cfbaa 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -17,11 +17,14 @@ package tso import ( "context" "math" + "math/rand" "strings" "sync" "testing" + "time" "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/testutil" @@ -168,3 +171,199 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { }) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp")) } + +func TestRandomTransferAPILeader(t *testing.T) { + re := require.New(t) + + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + ctx, cancel := context.WithCancel(context.Background()) + cluster, err := tests.NewTestAPICluster(ctx, 3) + re.NoError(err) + defer cancel() + defer cluster.Destroy() + + err = cluster.RunInitialServers() + re.NoError(err) + + leaderServer := cluster.GetServer(cluster.WaitLeader()) + backendEndpoints := leaderServer.GetAddr() + + _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints) + defer cleanup() + + cli1 := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) + cli2 := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) + + var wg sync.WaitGroup + wg.Add(tsoRequestConcurrencyNumber + 1) + go func() { + defer wg.Done() + n := r.Intn(3) + 1 + time.Sleep(time.Duration(n) * time.Second) + leaderServer.ResignLeader() + leaderServer = cluster.GetServer(cluster.WaitLeader()) + }() + + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + go func() { + defer wg.Done() + var lastTS uint64 + for i := 0; i < tsoRequestRound; i++ { + time.Sleep(time.Millisecond * 30) + physical, logical, err := cli1.GetTS(context.Background()) + re.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + physical, logical, err = cli2.GetTS(context.Background()) + re.NoError(err) + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + }() + } + wg.Wait() +} + +// When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time. +func TestMixedTSODeployment(t *testing.T) { + re := require.New(t) + + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)")) + defer re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)")) + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + ctx, cancel := context.WithCancel(context.Background()) + cluster, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + defer cancel() + defer cluster.Destroy() + + err = cluster.RunInitialServers() + re.NoError(err) + + leaderServer := cluster.GetServer(cluster.WaitLeader()) + backendEndpoints := leaderServer.GetAddr() + + apiSvr, err := cluster.JoinAPIServer(ctx) + re.NoError(err) + err = apiSvr.Run() + re.NoError(err) + + _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints) + defer cleanup() + + cli1 := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) + cli2 := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) + + var wg sync.WaitGroup + wg.Add(tsoRequestConcurrencyNumber + 1) + go func() { + defer wg.Done() + for i := 0; i < 2; i++ { + n := r.Intn(3) + 1 + time.Sleep(time.Duration(n) * time.Second) + leaderServer.ResignLeader() + leaderServer = cluster.GetServer(cluster.WaitLeader()) + } + }() + + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + go func() { + defer wg.Done() + var ts, lastTS uint64 + for i := 0; i < tsoRequestRound; i++ { + time.Sleep(time.Millisecond * 30) + physical, logical, err := cli1.GetTS(context.Background()) + if err != nil { + re.ErrorContains(err, "not leader") + } else { + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + physical, logical, _ = cli2.GetTS(context.Background()) + if err != nil { + re.ErrorContains(err, "not leader") + } else { + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + } + }() + } + wg.Wait() +} + +func TestRandomShutdown(t *testing.T) { + re := require.New(t) + + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + ctx, cancel := context.WithCancel(context.Background()) + cluster, err := tests.NewTestAPICluster(ctx, 3) + re.NoError(err) + defer cancel() + defer cluster.Destroy() + + err = cluster.RunInitialServers() + re.NoError(err) + + leaderServer := cluster.GetServer(cluster.WaitLeader()) + backendEndpoints := leaderServer.GetAddr() + + tsoSvr1, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints) + defer cleanup() + tsoSvr2, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints) + defer cleanup() + + cli1 := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) + cli2 := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) + + var wg sync.WaitGroup + wg.Add(tsoRequestConcurrencyNumber + 1) + go func() { + defer wg.Done() + n := r.Intn(3) + 1 + time.Sleep(time.Duration(n) * time.Second) + if r.Intn(2) == 0 { + tsoSvr1.Close() + } else { + tsoSvr2.Close() + } + }() + + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + go func() { + defer wg.Done() + var ts, lastTS uint64 + for i := 0; i < tsoRequestRound; i++ { + time.Sleep(time.Millisecond * 30) + physical, logical, err := cli1.GetTS(context.Background()) + if err != nil { + re.ErrorContains(err, "server not started") + } else { + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + physical, logical, err = cli2.GetTS(context.Background()) + if err != nil { + re.ErrorContains(err, "server not started") + } else { + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + } + }() + } + wg.Wait() +} From f5a35d07d12e0234cc7b93a2bf4987e69d116eb6 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 22 Mar 2023 10:45:15 +0800 Subject: [PATCH 2/4] resolve the conflicts Signed-off-by: Ryan Leung --- tests/integrations/tso/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 584ee8cfbaa..2dae5af4d1b 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -83,7 +83,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { re.NoError(err) } else { suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc()) - suite.client = mcs.SetupTSOClient(suite.ctx, re, strings.Split(backendEndpoints, ",")) + suite.client = mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(backendEndpoints, ",")) } } From fe2abc01d815f3fd4f06919fc308de35fdfcec6f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 24 Mar 2023 16:09:19 +0800 Subject: [PATCH 3/4] improve the tests Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 2 - tests/integrations/tso/client_test.go | 196 ++++++++++++-------------- 2 files changed, 87 insertions(+), 111 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index d7c2220a722..c4b9db33ea5 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -264,7 +264,6 @@ func (c *RaftCluster) Start(s Server) error { if cluster == nil { return nil } - c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts()) if c.opt.IsPlacementRulesEnabled() { err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels()) @@ -272,7 +271,6 @@ func (c *RaftCluster) Start(s Server) error { return err } } - c.regionLabeler, err = labeler.NewRegionLabeler(c.ctx, c.storage, regionLabelGCInterval) if err != nil { return err diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 2dae5af4d1b..8c09bbc1c31 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -47,6 +47,8 @@ type tsoClientTestSuite struct { tsoServer *tso.Server tsoServerCleanup func() + backendEndpoints string + client pd.TSOClient } @@ -77,13 +79,14 @@ func (suite *tsoClientTestSuite) SetupSuite() { re.NoError(err) leaderName := suite.cluster.WaitLeader() pdLeader := suite.cluster.GetServer(leaderName) - backendEndpoints := pdLeader.GetAddr() + re.NoError(pdLeader.BootstrapCluster()) + suite.backendEndpoints = pdLeader.GetAddr() if suite.legacy { - suite.client, err = pd.NewClientWithContext(suite.ctx, strings.Split(backendEndpoints, ","), pd.SecurityOption{}) + suite.client, err = pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}) re.NoError(err) } else { - suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc()) - suite.client = mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(backendEndpoints, ",")) + suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + suite.client = mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(suite.backendEndpoints, ",")) } } @@ -172,143 +175,119 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp")) } -func TestRandomTransferAPILeader(t *testing.T) { - re := require.New(t) +func (suite *tsoClientTestSuite) TestRandomTransferLeader() { + re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) r := rand.New(rand.NewSource(time.Now().UnixNano())) - ctx, cancel := context.WithCancel(context.Background()) - cluster, err := tests.NewTestAPICluster(ctx, 3) - re.NoError(err) - defer cancel() - defer cluster.Destroy() - - err = cluster.RunInitialServers() - re.NoError(err) - - leaderServer := cluster.GetServer(cluster.WaitLeader()) - backendEndpoints := leaderServer.GetAddr() - - _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints) - defer cleanup() - - cli1 := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) - cli2 := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) + ctx, cancel := context.WithCancel(suite.ctx) var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber + 1) go func() { defer wg.Done() - n := r.Intn(3) + 1 + n := r.Intn(2) + 1 time.Sleep(time.Duration(n) * time.Second) - leaderServer.ResignLeader() - leaderServer = cluster.GetServer(cluster.WaitLeader()) + err := suite.cluster.ResignLeader() + re.NoError(err) + suite.cluster.WaitLeader() + cancel() }() for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { defer wg.Done() - var lastTS uint64 - for i := 0; i < tsoRequestRound; i++ { - time.Sleep(time.Millisecond * 30) - physical, logical, err := cli1.GetTS(context.Background()) - re.NoError(err) - ts := tsoutil.ComposeTS(physical, logical) - re.Less(lastTS, ts) - lastTS = ts - physical, logical, err = cli2.GetTS(context.Background()) - re.NoError(err) - ts = tsoutil.ComposeTS(physical, logical) - re.Less(lastTS, ts) - lastTS = ts + client := mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(suite.backendEndpoints, ",")) + var ts, lastTS uint64 + for { + physical, logical, err := client.GetTS(suite.ctx) + if err != nil { + re.ErrorContains(err, "not leader") + } else { + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + select { + case <-ctx.Done(): + return + default: + } } }() } wg.Wait() } -// When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time. -func TestMixedTSODeployment(t *testing.T) { - re := require.New(t) - +func (suite *tsoClientTestSuite) TestRandomShutdown() { + re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)")) - defer re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)")) - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - ctx, cancel := context.WithCancel(context.Background()) - cluster, err := tests.NewTestCluster(ctx, 1) - re.NoError(err) - defer cancel() - defer cluster.Destroy() - - err = cluster.RunInitialServers() - re.NoError(err) - - leaderServer := cluster.GetServer(cluster.WaitLeader()) - backendEndpoints := leaderServer.GetAddr() - - apiSvr, err := cluster.JoinAPIServer(ctx) - re.NoError(err) - err = apiSvr.Run() - re.NoError(err) - _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints) + tsoSvr, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) defer cleanup() - cli1 := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) - cli2 := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) - + ctx, cancel := context.WithCancel(suite.ctx) var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber + 1) go func() { defer wg.Done() - for i := 0; i < 2; i++ { - n := r.Intn(3) + 1 - time.Sleep(time.Duration(n) * time.Second) - leaderServer.ResignLeader() - leaderServer = cluster.GetServer(cluster.WaitLeader()) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + n := r.Intn(2) + 1 + time.Sleep(time.Duration(n) * time.Second) + if !suite.legacy { + // random close one of the tso servers + if r.Intn(2) == 0 { + tsoSvr.Close() + } else { + suite.tsoServer.Close() + } + } else { + // close pd leader server + suite.cluster.GetServer(suite.cluster.GetLeader()).GetServer().Close() } + cancel() }() for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { defer wg.Done() + client := mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(suite.backendEndpoints, ",")) var ts, lastTS uint64 - for i := 0; i < tsoRequestRound; i++ { - time.Sleep(time.Millisecond * 30) - physical, logical, err := cli1.GetTS(context.Background()) + for { + physical, logical, err := client.GetTS(suite.ctx) if err != nil { - re.ErrorContains(err, "not leader") + re.Regexp("(server not started|closed unexpectedly)", err.Error()) } else { ts = tsoutil.ComposeTS(physical, logical) re.Less(lastTS, ts) lastTS = ts } - physical, logical, _ = cli2.GetTS(context.Background()) - if err != nil { - re.ErrorContains(err, "not leader") - } else { - ts = tsoutil.ComposeTS(physical, logical) - re.Less(lastTS, ts) - lastTS = ts + select { + case <-ctx.Done(): + return + default: } } }() } wg.Wait() + suite.TearDownSuite() + suite.SetupSuite() } -func TestRandomShutdown(t *testing.T) { +// When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time. +func TestMixedTSODeployment(t *testing.T) { re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - r := rand.New(rand.NewSource(time.Now().UnixNano())) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)")) + defer re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)")) + ctx, cancel := context.WithCancel(context.Background()) - cluster, err := tests.NewTestAPICluster(ctx, 3) + cluster, err := tests.NewTestCluster(ctx, 1) re.NoError(err) defer cancel() defer cluster.Destroy() @@ -319,48 +298,47 @@ func TestRandomShutdown(t *testing.T) { leaderServer := cluster.GetServer(cluster.WaitLeader()) backendEndpoints := leaderServer.GetAddr() - tsoSvr1, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints) - defer cleanup() - tsoSvr2, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints) - defer cleanup() + apiSvr, err := cluster.JoinAPIServer(ctx) + re.NoError(err) + err = apiSvr.Run() + re.NoError(err) - cli1 := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) - cli2 := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) + _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) + defer cleanup() + ctx1, cancel1 := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber + 1) go func() { defer wg.Done() - n := r.Intn(3) + 1 - time.Sleep(time.Duration(n) * time.Second) - if r.Intn(2) == 0 { - tsoSvr1.Close() - } else { - tsoSvr2.Close() + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < 2; i++ { + n := r.Intn(2) + 1 + time.Sleep(time.Duration(n) * time.Second) + leaderServer.ResignLeader() + leaderServer = cluster.GetServer(cluster.WaitLeader()) } + cancel1() }() for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { defer wg.Done() + cli := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) var ts, lastTS uint64 - for i := 0; i < tsoRequestRound; i++ { - time.Sleep(time.Millisecond * 30) - physical, logical, err := cli1.GetTS(context.Background()) + for { + physical, logical, err := cli.GetTS(ctx) if err != nil { - re.ErrorContains(err, "server not started") + re.Regexp("(context canceled|not leader|not the pd or local tso allocator leader)", err.Error()) } else { ts = tsoutil.ComposeTS(physical, logical) re.Less(lastTS, ts) lastTS = ts } - physical, logical, err = cli2.GetTS(context.Background()) - if err != nil { - re.ErrorContains(err, "server not started") - } else { - ts = tsoutil.ComposeTS(physical, logical) - re.Less(lastTS, ts) - lastTS = ts + select { + case <-ctx1.Done(): + return + default: } } }() From 93e9df674d7b3b57e222ceb749bb82e427b1686f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 24 Mar 2023 18:34:18 +0800 Subject: [PATCH 4/4] fix Signed-off-by: Ryan Leung --- tests/integrations/tso/client_test.go | 65 ++++++--------------------- 1 file changed, 14 insertions(+), 51 deletions(-) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 8c09bbc1c31..c98caa9db7f 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -195,28 +195,7 @@ func (suite *tsoClientTestSuite) TestRandomTransferLeader() { cancel() }() - for i := 0; i < tsoRequestConcurrencyNumber; i++ { - go func() { - defer wg.Done() - client := mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(suite.backendEndpoints, ",")) - var ts, lastTS uint64 - for { - physical, logical, err := client.GetTS(suite.ctx) - if err != nil { - re.ErrorContains(err, "not leader") - } else { - ts = tsoutil.ComposeTS(physical, logical) - re.Less(lastTS, ts) - lastTS = ts - } - select { - case <-ctx.Done(): - return - default: - } - } - }() - } + checkTSO(ctx, re, &wg, suite.backendEndpoints) wg.Wait() } @@ -250,28 +229,7 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { cancel() }() - for i := 0; i < tsoRequestConcurrencyNumber; i++ { - go func() { - defer wg.Done() - client := mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(suite.backendEndpoints, ",")) - var ts, lastTS uint64 - for { - physical, logical, err := client.GetTS(suite.ctx) - if err != nil { - re.Regexp("(server not started|closed unexpectedly)", err.Error()) - } else { - ts = tsoutil.ComposeTS(physical, logical) - re.Less(lastTS, ts) - lastTS = ts - } - select { - case <-ctx.Done(): - return - default: - } - } - }() - } + checkTSO(ctx, re, &wg, suite.backendEndpoints) wg.Wait() suite.TearDownSuite() suite.SetupSuite() @@ -320,28 +278,33 @@ func TestMixedTSODeployment(t *testing.T) { } cancel1() }() + checkTSO(ctx1, re, &wg, backendEndpoints) + wg.Wait() +} +func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string) { for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { defer wg.Done() - cli := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) + cli := mcs.SetupClientWithKeyspace(context.Background(), re, strings.Split(backendEndpoints, ",")) var ts, lastTS uint64 for { - physical, logical, err := cli.GetTS(ctx) - if err != nil { - re.Regexp("(context canceled|not leader|not the pd or local tso allocator leader)", err.Error()) - } else { + physical, logical, err := cli.GetTS(context.Background()) + // omit the error check since there are many kinds of errors + if err == nil { ts = tsoutil.ComposeTS(physical, logical) re.Less(lastTS, ts) lastTS = ts } select { - case <-ctx1.Done(): + case <-ctx.Done(): + physical, logical, _ := cli.GetTS(context.Background()) + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) return default: } } }() } - wg.Wait() }