Skip to content

Commit

Permalink
mcs: close the connection to avoid retrying to connect (#7757)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx authored Jan 31, 2024
1 parent 4e48f5b commit 9721654
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 27 deletions.
4 changes: 2 additions & 2 deletions pkg/utils/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@ func CheckStream(ctx context.Context, cancel context.CancelFunc, done chan struc

// NeedRebuildConnection checks if the error is a connection error.
func NeedRebuildConnection(err error) bool {
return err == io.EOF ||
return (err != nil) && (err == io.EOF ||
strings.Contains(err.Error(), codes.Unavailable.String()) || // Unavailable indicates the service is currently unavailable. This is a most likely a transient condition.
strings.Contains(err.Error(), codes.DeadlineExceeded.String()) || // DeadlineExceeded means operation expired before completion.
strings.Contains(err.Error(), codes.Internal.String()) || // Internal errors.
strings.Contains(err.Error(), codes.Unknown.String()) || // Unknown error.
strings.Contains(err.Error(), codes.ResourceExhausted.String()) // ResourceExhausted is returned when either the client or the server has exhausted their resources.
strings.Contains(err.Error(), codes.ResourceExhausted.String())) // ResourceExhausted is returned when either the client or the server has exhausted their resources.
// Besides, we don't need to rebuild the connection if the code is Canceled, which means the client cancelled the request.
}
40 changes: 18 additions & 22 deletions server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,17 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
forwardStream tsopb.TSO_TsoClient
forwardCtx context.Context
cancelForward context.CancelFunc
tsoStreamErr error
lastForwardedHost string
)
defer func() {
s.concurrentTSOProxyStreamings.Add(-1)
if cancelForward != nil {
cancelForward()
}
if grpcutil.NeedRebuildConnection(tsoStreamErr) {
s.closeDelegateClient(lastForwardedHost)
}
}()

maxConcurrentTSOProxyStreamings := int32(s.GetMaxConcurrentTSOProxyStreamings())
Expand Down Expand Up @@ -131,7 +135,8 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {

forwardedHost, ok := s.GetServicePrimaryAddr(stream.Context(), utils.TSOServiceName)
if !ok || len(forwardedHost) == 0 {
return errors.WithStack(ErrNotFoundTSOAddr)
tsoStreamErr = errors.WithStack(ErrNotFoundTSOAddr)
return tsoStreamErr
}
if forwardStream == nil || lastForwardedHost != forwardedHost {
if cancelForward != nil {
Expand All @@ -140,18 +145,21 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {

clientConn, err := s.getDelegateClient(s.ctx, forwardedHost)
if err != nil {
return errors.WithStack(err)
tsoStreamErr = errors.WithStack(err)
return tsoStreamErr
}
forwardStream, forwardCtx, cancelForward, err = s.createTSOForwardStream(stream.Context(), clientConn)
if err != nil {
return errors.WithStack(err)
tsoStreamErr = errors.WithStack(err)
return tsoStreamErr
}
lastForwardedHost = forwardedHost
}

tsopbResp, err := s.forwardTSORequestWithDeadLine(forwardCtx, cancelForward, forwardStream, request, tsDeadlineCh)
if err != nil {
return errors.WithStack(err)
tsoStreamErr = errors.WithStack(err)
return tsoStreamErr
}

// The error types defined for tsopb and pdpb are different, so we need to convert them.
Expand Down Expand Up @@ -363,25 +371,13 @@ func (s *GrpcServer) getDelegateClient(ctx context.Context, forwardedHost string
return conn.(*grpc.ClientConn), nil
}

func (s *GrpcServer) getForwardedHost(ctx, streamCtx context.Context, serviceName ...string) (forwardedHost string, err error) {
if s.IsAPIServiceMode() {
var ok bool
if len(serviceName) == 0 {
return "", ErrNotFoundService
}
forwardedHost, ok = s.GetServicePrimaryAddr(ctx, serviceName[0])
if !ok || len(forwardedHost) == 0 {
switch serviceName[0] {
case utils.TSOServiceName:
return "", ErrNotFoundTSOAddr
case utils.SchedulingServiceName:
return "", ErrNotFoundSchedulingAddr
}
}
} else if fh := grpcutil.GetForwardedHost(streamCtx); !s.isLocalRequest(fh) {
forwardedHost = fh
func (s *GrpcServer) closeDelegateClient(forwardedHost string) {
client, ok := s.clientConns.LoadAndDelete(forwardedHost)
if !ok {
return
}
return forwardedHost, nil
client.(*grpc.ClientConn).Close()
log.Debug("close delegate client connection", zap.String("forwarded-host", forwardedHost))
}

func (s *GrpcServer) isLocalRequest(host string) bool {
Expand Down
9 changes: 6 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,8 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
return errors.WithStack(err)
}

if forwardedHost, err := s.getForwardedHost(ctx, stream.Context(), utils.TSOServiceName); err != nil {
return err
} else if len(forwardedHost) > 0 {
forwardedHost := grpcutil.GetForwardedHost(stream.Context())
if !s.isLocalRequest(forwardedHost) {
clientConn, err := s.getDelegateClient(s.ctx, forwardedHost)
if err != nil {
return errors.WithStack(err)
Expand Down Expand Up @@ -1332,6 +1331,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
if cancel != nil {
cancel()
}

client, err := s.getDelegateClient(s.ctx, forwardedSchedulingHost)
if err != nil {
errRegionHeartbeatClient.Inc()
Expand Down Expand Up @@ -1370,6 +1370,9 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
}
if err := forwardSchedulingStream.Send(schedulingpbReq); err != nil {
forwardSchedulingStream = nil
if grpcutil.NeedRebuildConnection(err) {
s.closeDelegateClient(lastForwardedSchedulingHost)
}
errRegionHeartbeatSend.Inc()
log.Error("failed to send request to scheduling service", zap.Error(err))
}
Expand Down

0 comments on commit 9721654

Please sign in to comment.