diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index f61ce320a74..00dd8c5107d 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -19,6 +19,8 @@ import ( "time" "github.com/docker/go-units" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -77,6 +79,11 @@ func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (C var regionGuide = core.GenerateRegionGuideFunc(false) +// IsRunning returns whether the region syncer client is running. +func (s *RegionSyncer) IsRunning() bool { + return s.streamingRunning.Load() +} + // StartSyncWithLeader starts to sync with leader. func (s *RegionSyncer) StartSyncWithLeader(addr string) { s.wg.Add(1) @@ -89,6 +96,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { go func() { defer logutil.LogPanic() defer s.wg.Done() + defer s.streamingRunning.Store(false) // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() regionStorage := s.server.GetStorage() @@ -132,6 +140,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { } stream, err := s.syncRegion(ctx, conn) + failpoint.Inject("disableClientStreaming", func() { + err = errors.Errorf("no stream") + }) if err != nil { if ev, ok := status.FromError(err); ok { if ev.Code() == codes.Canceled { @@ -142,11 +153,11 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { time.Sleep(time.Second) continue } - log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex())) for { resp, err := stream.Recv() if err != nil { + s.streamingRunning.Store(false) log.Error("region sync with leader meet error", errs.ZapError(errs.ErrGRPCRecv, err)) if err = stream.CloseSend(); err != nil { log.Error("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err)) @@ -212,6 +223,8 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { _ = regionStorage.DeleteRegion(old.GetMeta()) } } + // mark the client as running status when it finished the first history region sync. + s.streamingRunning.Store(true) } } }() diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 7d339e75dbe..4fb38614de0 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -18,6 +18,7 @@ import ( "context" "io" "sync" + "sync/atomic" "time" "github.com/docker/go-units" @@ -83,6 +84,8 @@ type RegionSyncer struct { history *historyBuffer limit *ratelimit.RateLimiter tlsConfig *grpcutil.TLSConfig + // status when as client + streamingRunning atomic.Bool } // NewRegionSyncer returns a region syncer. @@ -228,7 +231,16 @@ func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.Sync if s.history.GetNextIndex() == startIndex { log.Info("requested server has already in sync with server", zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Uint64("last-index", startIndex)) - return nil + // still send a response to follower to show the history region sync. + resp := &pdpb.SyncRegionResponse{ + Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()}, + Regions: nil, + StartIndex: startIndex, + RegionStats: nil, + RegionLeaders: nil, + Buckets: nil, + } + return stream.Send(resp) } // do full synchronization if startIndex == 0 { diff --git a/server/server.go b/server/server.go index 76893c24388..43daa65d844 100644 --- a/server/server.go +++ b/server/server.go @@ -1363,6 +1363,12 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster { return s.cluster } +// DirectlyGetRaftCluster returns raft cluster directly. +// Only used for test. +func (s *Server) DirectlyGetRaftCluster() *cluster.RaftCluster { + return s.cluster +} + // GetCluster gets cluster. func (s *Server) GetCluster() *metapb.Cluster { return &metapb.Cluster{ diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index 6521432c0dc..87b5c0683c7 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -47,21 +47,34 @@ func (i *idAllocator) alloc() uint64 { func TestRegionSyncer(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) - defer cancel() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/regionStorageFastFlush", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/syncer/noFastExitSync", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/syncer/disableClientStreaming", `return(true)`)) cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) { conf.PDServerCfg.UseRegionStorage = true }) - defer cluster.Destroy() + defer func() { + cluster.Destroy() + cancel() + }() re.NoError(err) re.NoError(cluster.RunInitialServers()) cluster.WaitLeader() leaderServer := cluster.GetLeaderServer() + re.NoError(leaderServer.BootstrapCluster()) rc := leaderServer.GetServer().GetRaftCluster() re.NotNil(rc) + followerServer := cluster.GetServer(cluster.GetFollower()) + + testutil.Eventually(re, func() bool { + return !followerServer.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunning() + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/syncer/disableClientStreaming")) re.True(cluster.WaitRegionSyncerClientsReady(2)) + testutil.Eventually(re, func() bool { + return followerServer.GetServer().DirectlyGetRaftCluster().GetRegionSyncer().IsRunning() + }) regionLen := 110 regions := initRegions(regionLen) @@ -119,7 +132,6 @@ func TestRegionSyncer(t *testing.T) { time.Sleep(4 * time.Second) // test All regions have been synchronized to the cache of followerServer - followerServer := cluster.GetServer(cluster.GetFollower()) re.NotNil(followerServer) cacheRegions := leaderServer.GetServer().GetBasicCluster().GetRegions() re.Len(cacheRegions, regionLen) @@ -141,6 +153,9 @@ func TestRegionSyncer(t *testing.T) { re.NoError(err) cluster.WaitLeader() leaderServer = cluster.GetLeaderServer() + testutil.Eventually(re, func() bool { + return !leaderServer.GetServer().GetRaftCluster().GetRegionSyncer().IsRunning() + }) re.NotNil(leaderServer) loadRegions := leaderServer.GetServer().GetRaftCluster().GetRegions() re.Len(loadRegions, regionLen)