From 3f64d88e7afb359687541f7f634a96c4692de65d Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 22 Jan 2025 17:29:27 +0800 Subject: [PATCH 1/2] exit broadcast as soon as possible Signed-off-by: Ryan Leung --- pkg/syncer/server.go | 65 ++++++++++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 150ff738c15..549ef1da619 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/keypath" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" ) @@ -79,11 +80,12 @@ type RegionSyncer struct { clientCtx context.Context clientCancel context.CancelFunc } - server Server - wg sync.WaitGroup - history *historyBuffer - limit *ratelimit.RateLimiter - tlsConfig *grpcutil.TLSConfig + broadcastDone chan struct{} + server Server + wg sync.WaitGroup + history *historyBuffer + limit *ratelimit.RateLimiter + tlsConfig *grpcutil.TLSConfig // status when as client streamingRunning atomic.Bool } @@ -96,10 +98,11 @@ func NewRegionSyncer(s Server) *RegionSyncer { return nil } syncer := &RegionSyncer{ - server: s, - history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)), - limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity), - tlsConfig: s.GetTLSConfig(), + server: s, + history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)), + limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity), + tlsConfig: s.GetTLSConfig(), + broadcastDone: make(chan struct{}, 1), } syncer.mu.streams = make(map[string]ServerStream) return syncer @@ -160,13 +163,13 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor RegionLeaders: leaders, Buckets: buckets, } - s.broadcast(regions) + s.broadcast(ctx, regions) case <-ticker.C: alive := &pdpb.SyncRegionResponse{ Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()}, StartIndex: s.history.getNextIndex(), } - s.broadcast(alive) + s.broadcast(ctx, alive) } requests = requests[:0] stats = stats[:0] @@ -344,23 +347,31 @@ func (s *RegionSyncer) bindStream(name string, stream ServerStream) { s.mu.streams[name] = stream } -func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) { - var failed []string - s.mu.RLock() - for name, sender := range s.mu.streams { - err := sender.Send(regions) - if err != nil { - log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err)) - failed = append(failed, name) +func (s *RegionSyncer) broadcast(ctx context.Context, regions *pdpb.SyncRegionResponse) { + go func() { + defer logutil.LogPanic() + var failed []string + s.mu.RLock() + for name, sender := range s.mu.streams { + err := sender.Send(regions) + if err != nil { + log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err)) + failed = append(failed, name) + } } - } - s.mu.RUnlock() - if len(failed) > 0 { - s.mu.Lock() - for _, name := range failed { - delete(s.mu.streams, name) - log.Info("region syncer delete the stream", zap.String("stream", name)) + s.mu.RUnlock() + if len(failed) > 0 { + s.mu.Lock() + for _, name := range failed { + delete(s.mu.streams, name) + log.Info("region syncer delete the stream", zap.String("stream", name)) + } + s.mu.Unlock() } - s.mu.Unlock() + s.broadcastDone <- struct{}{} + }() + select { + case <-s.broadcastDone: + case <-ctx.Done(): } } From 995d25c9bc8195907b28a949790952ac2e53be72 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 23 Jan 2025 14:49:09 +0800 Subject: [PATCH 2/2] address the comment Signed-off-by: Ryan Leung --- pkg/syncer/server.go | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 549ef1da619..d7daefc6f5b 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -80,12 +80,11 @@ type RegionSyncer struct { clientCtx context.Context clientCancel context.CancelFunc } - broadcastDone chan struct{} - server Server - wg sync.WaitGroup - history *historyBuffer - limit *ratelimit.RateLimiter - tlsConfig *grpcutil.TLSConfig + server Server + wg sync.WaitGroup + history *historyBuffer + limit *ratelimit.RateLimiter + tlsConfig *grpcutil.TLSConfig // status when as client streamingRunning atomic.Bool } @@ -98,11 +97,10 @@ func NewRegionSyncer(s Server) *RegionSyncer { return nil } syncer := &RegionSyncer{ - server: s, - history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)), - limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity), - tlsConfig: s.GetTLSConfig(), - broadcastDone: make(chan struct{}, 1), + server: s, + history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)), + limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity), + tlsConfig: s.GetTLSConfig(), } syncer.mu.streams = make(map[string]ServerStream) return syncer @@ -348,6 +346,7 @@ func (s *RegionSyncer) bindStream(name string, stream ServerStream) { } func (s *RegionSyncer) broadcast(ctx context.Context, regions *pdpb.SyncRegionResponse) { + broadcastDone := make(chan struct{}, 1) go func() { defer logutil.LogPanic() var failed []string @@ -368,10 +367,10 @@ func (s *RegionSyncer) broadcast(ctx context.Context, regions *pdpb.SyncRegionRe } s.mu.Unlock() } - s.broadcastDone <- struct{}{} + close(broadcastDone) }() select { - case <-s.broadcastDone: + case <-broadcastDone: case <-ctx.Done(): } }