diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 5f6717e0d33..8fb9ec8b286 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -372,6 +372,9 @@ func (c *Coordinator) RunUntilStop() { c.Run() <-c.ctx.Done() log.Info("coordinator is stopping") + c.GetSchedulersController().Wait() + c.wg.Wait() + log.Info("coordinator has been stopped") } // Run starts coordinator. @@ -576,9 +579,6 @@ func (c *Coordinator) waitPluginUnload(pluginPath, schedulerName string, ch chan // Stop stops the coordinator. func (c *Coordinator) Stop() { c.cancel() - c.GetSchedulersController().Wait() - c.wg.Wait() - log.Info("coordinator has been stopped") } // GetHotRegionsByType gets hot regions' statistics by RWType. diff --git a/server/forward.go b/server/forward.go index 5aafb3cdd56..e765d442539 100644 --- a/server/forward.go +++ b/server/forward.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -245,11 +246,15 @@ func forwardRegionHeartbeatToScheduling(forwardStream schedulingpb.Scheduling_Re defer close(errCh) for { resp, err := forwardStream.Recv() + if err == io.EOF { + errCh <- errors.WithStack(err) + return + } if err != nil { errCh <- errors.WithStack(err) return } - // The error types defined for tsopb and pdpb are different, so we need to convert them. + // The error types defined for schedulingpb and pdpb are different, so we need to convert them. var pdpbErr *pdpb.Error schedulingpbErr := resp.GetHeader().GetError() if schedulingpbErr != nil { @@ -395,7 +400,10 @@ func (s *GrpcServer) isLocalRequest(forwardedHost string) bool { return false } -func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timestamp, error) { +func (s *GrpcServer) getGlobalTSO(ctx context.Context) (pdpb.Timestamp, error) { + if !s.IsAPIServiceMode() { + return s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1) + } request := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: s.clusterID, @@ -409,9 +417,28 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest forwardStream tsopb.TSO_TsoClient ts *tsopb.TsoResponse err error + ok bool ) + handleStreamError := func(err error) (needRetry bool) { + if strings.Contains(err.Error(), errs.NotLeaderErr) { + s.tsoPrimaryWatcher.ForceLoad() + log.Warn("force to load tso primary address due to error", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return true + } + if grpcutil.NeedRebuildConnection(err) { + s.tsoClientPool.Lock() + delete(s.tsoClientPool.clients, forwardedHost) + s.tsoClientPool.Unlock() + log.Warn("client connection removed due to error", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return true + } + return false + } for i := 0; i < maxRetryTimesRequestTSOServer; i++ { - forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName) + if i > 0 { + time.Sleep(retryIntervalRequestTSOServer) + } + forwardedHost, ok = s.GetServicePrimaryAddr(ctx, utils.TSOServiceName) if !ok || forwardedHost == "" { return pdpb.Timestamp{}, ErrNotFoundTSOAddr } @@ -419,26 +446,25 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest if err != nil { return pdpb.Timestamp{}, err } - forwardStream.Send(request) - ts, err = forwardStream.Recv() + err = forwardStream.Send(request) if err != nil { - if strings.Contains(err.Error(), errs.NotLeaderErr) { - s.tsoPrimaryWatcher.ForceLoad() - time.Sleep(retryIntervalRequestTSOServer) + if needRetry := handleStreamError(err); needRetry { continue } - if strings.Contains(err.Error(), codes.Unavailable.String()) { - s.tsoClientPool.Lock() - delete(s.tsoClientPool.clients, forwardedHost) - s.tsoClientPool.Unlock() + log.Error("send request to tso primary server failed", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return pdpb.Timestamp{}, err + } + ts, err = forwardStream.Recv() + if err != nil { + if needRetry := handleStreamError(err); needRetry { continue } - log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso-addr", forwardedHost)) + log.Error("receive response from tso primary server failed", zap.Error(err), zap.String("tso-addr", forwardedHost)) return pdpb.Timestamp{}, err } return *ts.GetTimestamp(), nil } - log.Error("get global tso from tso service primary addr failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost)) + log.Error("get global tso from tso primary server failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost)) return pdpb.Timestamp{}, err } diff --git a/server/grpc_service.go b/server/grpc_service.go index 18b0117ad4e..b0384a7d629 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -864,9 +864,9 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear Stats: request.GetStats(), } if _, err := cli.StoreHeartbeat(ctx, req); err != nil { - log.Info("forward store heartbeat failed", zap.Error(err)) + log.Debug("forward store heartbeat failed", zap.Error(err)) // reset to let it be updated in the next request - // s.schedulingClient.CompareAndSwap(forwardCli, &schedulingClient{}) + s.schedulingClient.CompareAndSwap(forwardCli, &schedulingClient{}) } } } @@ -1063,6 +1063,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error errCh chan error forwardStream pdpb.PD_RegionHeartbeatClient lastForwardedHost string + forwardErrCh chan error forwardSchedulingStream schedulingpb.Scheduling_RegionHeartbeatClient lastForwardedSchedulingHost string ) @@ -1094,7 +1095,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error if err != nil { return err } - log.Info("create bucket report forward stream", zap.String("forwarded-host", forwardedHost)) + log.Info("create region heartbeat forward stream", zap.String("forwarded-host", forwardedHost)) forwardStream, cancel, err = s.createRegionHeartbeatForwardStream(client) if err != nil { return err @@ -1185,9 +1186,22 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc() if s.IsServiceIndependent(utils.SchedulingServiceName) { + if forwardErrCh != nil { + select { + case err, ok := <-forwardErrCh: + if ok { + if cancel != nil { + cancel() + } + forwardSchedulingStream = nil + log.Error("meet error and need to re-establish the stream", zap.Error(err)) + } + default: + } + } forwardedSchedulingHost, ok := s.GetServicePrimaryAddr(stream.Context(), utils.SchedulingServiceName) if !ok || len(forwardedSchedulingHost) == 0 { - log.Error("failed to find scheduling service primary address") + log.Debug("failed to find scheduling service primary address") if cancel != nil { cancel() } @@ -1209,8 +1223,8 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error continue } lastForwardedSchedulingHost = forwardedSchedulingHost - errCh = make(chan error, 1) - go forwardRegionHeartbeatToScheduling(forwardSchedulingStream, server, errCh) + forwardErrCh = make(chan error, 1) + go forwardRegionHeartbeatToScheduling(forwardSchedulingStream, server, forwardErrCh) } schedulingpbReq := &schedulingpb.RegionHeartbeatRequest{ Header: &schedulingpb.RequestHeader{ @@ -1237,11 +1251,13 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error } select { - case err := <-errCh: - log.Error("failed to send response", zap.Error(err)) + case err, ok := <-forwardErrCh: + if ok { + forwardSchedulingStream = nil + log.Error("failed to send response", zap.Error(err)) + } default: } - continue } } }