Skip to content

Commit

Permalink
solve the conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Nov 13, 2023
1 parent b8885a7 commit f48ff19
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 26 deletions.
6 changes: 3 additions & 3 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ func (c *Coordinator) RunUntilStop() {
c.Run()
<-c.ctx.Done()
log.Info("coordinator is stopping")
c.GetSchedulersController().Wait()
c.wg.Wait()
log.Info("coordinator has been stopped")
}

// Run starts coordinator.
Expand Down Expand Up @@ -576,9 +579,6 @@ func (c *Coordinator) waitPluginUnload(pluginPath, schedulerName string, ch chan
// Stop stops the coordinator.
func (c *Coordinator) Stop() {
c.cancel()
c.GetSchedulersController().Wait()
c.wg.Wait()
log.Info("coordinator has been stopped")
}

// GetHotRegionsByType gets hot regions' statistics by RWType.
Expand Down
54 changes: 40 additions & 14 deletions server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
Expand Down Expand Up @@ -245,11 +246,15 @@ func forwardRegionHeartbeatToScheduling(forwardStream schedulingpb.Scheduling_Re
defer close(errCh)
for {
resp, err := forwardStream.Recv()
if err == io.EOF {
errCh <- errors.WithStack(err)
return

Check warning on line 251 in server/forward.go

View check run for this annotation

Codecov / codecov/patch

server/forward.go#L250-L251

Added lines #L250 - L251 were not covered by tests
}
if err != nil {
errCh <- errors.WithStack(err)
return
}
// The error types defined for tsopb and pdpb are different, so we need to convert them.
// The error types defined for schedulingpb and pdpb are different, so we need to convert them.
var pdpbErr *pdpb.Error
schedulingpbErr := resp.GetHeader().GetError()
if schedulingpbErr != nil {
Expand Down Expand Up @@ -395,7 +400,10 @@ func (s *GrpcServer) isLocalRequest(forwardedHost string) bool {
return false
}

func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timestamp, error) {
func (s *GrpcServer) getGlobalTSO(ctx context.Context) (pdpb.Timestamp, error) {
if !s.IsAPIServiceMode() {
return s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1)
}
request := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: s.clusterID,
Expand All @@ -409,36 +417,54 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest
forwardStream tsopb.TSO_TsoClient
ts *tsopb.TsoResponse
err error
ok bool
)
handleStreamError := func(err error) (needRetry bool) {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
s.tsoPrimaryWatcher.ForceLoad()
log.Warn("force to load tso primary address due to error", zap.Error(err), zap.String("tso-addr", forwardedHost))
return true
}
if grpcutil.NeedRebuildConnection(err) {
s.tsoClientPool.Lock()
delete(s.tsoClientPool.clients, forwardedHost)
s.tsoClientPool.Unlock()
log.Warn("client connection removed due to error", zap.Error(err), zap.String("tso-addr", forwardedHost))
return true
}
return false

Check warning on line 435 in server/forward.go

View check run for this annotation

Codecov / codecov/patch

server/forward.go#L435

Added line #L435 was not covered by tests
}
for i := 0; i < maxRetryTimesRequestTSOServer; i++ {
forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName)
if i > 0 {
time.Sleep(retryIntervalRequestTSOServer)
}
forwardedHost, ok = s.GetServicePrimaryAddr(ctx, utils.TSOServiceName)
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
}
forwardStream, err = s.getTSOForwardStream(forwardedHost)
if err != nil {
return pdpb.Timestamp{}, err

Check warning on line 447 in server/forward.go

View check run for this annotation

Codecov / codecov/patch

server/forward.go#L447

Added line #L447 was not covered by tests
}
forwardStream.Send(request)
ts, err = forwardStream.Recv()
err = forwardStream.Send(request)
if err != nil {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
s.tsoPrimaryWatcher.ForceLoad()
time.Sleep(retryIntervalRequestTSOServer)
if needRetry := handleStreamError(err); needRetry {
continue
}
if strings.Contains(err.Error(), codes.Unavailable.String()) {
s.tsoClientPool.Lock()
delete(s.tsoClientPool.clients, forwardedHost)
s.tsoClientPool.Unlock()
log.Error("send request to tso primary server failed", zap.Error(err), zap.String("tso-addr", forwardedHost))
return pdpb.Timestamp{}, err

Check warning on line 455 in server/forward.go

View check run for this annotation

Codecov / codecov/patch

server/forward.go#L454-L455

Added lines #L454 - L455 were not covered by tests
}
ts, err = forwardStream.Recv()
if err != nil {
if needRetry := handleStreamError(err); needRetry {
continue
}
log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso-addr", forwardedHost))
log.Error("receive response from tso primary server failed", zap.Error(err), zap.String("tso-addr", forwardedHost))
return pdpb.Timestamp{}, err

Check warning on line 463 in server/forward.go

View check run for this annotation

Codecov / codecov/patch

server/forward.go#L462-L463

Added lines #L462 - L463 were not covered by tests
}
return *ts.GetTimestamp(), nil
}
log.Error("get global tso from tso service primary addr failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost))
log.Error("get global tso from tso primary server failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost))
return pdpb.Timestamp{}, err

Check warning on line 468 in server/forward.go

View check run for this annotation

Codecov / codecov/patch

server/forward.go#L467-L468

Added lines #L467 - L468 were not covered by tests
}

Expand Down
34 changes: 25 additions & 9 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,9 +864,9 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
Stats: request.GetStats(),
}
if _, err := cli.StoreHeartbeat(ctx, req); err != nil {
log.Info("forward store heartbeat failed", zap.Error(err))
log.Debug("forward store heartbeat failed", zap.Error(err))

Check warning on line 867 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L867

Added line #L867 was not covered by tests
// reset to let it be updated in the next request
// s.schedulingClient.CompareAndSwap(forwardCli, &schedulingClient{})
s.schedulingClient.CompareAndSwap(forwardCli, &schedulingClient{})

Check warning on line 869 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L869

Added line #L869 was not covered by tests
}
}
}
Expand Down Expand Up @@ -1063,6 +1063,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
errCh chan error
forwardStream pdpb.PD_RegionHeartbeatClient
lastForwardedHost string
forwardErrCh chan error
forwardSchedulingStream schedulingpb.Scheduling_RegionHeartbeatClient
lastForwardedSchedulingHost string
)
Expand Down Expand Up @@ -1094,7 +1095,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
if err != nil {
return err
}
log.Info("create bucket report forward stream", zap.String("forwarded-host", forwardedHost))
log.Info("create region heartbeat forward stream", zap.String("forwarded-host", forwardedHost))
forwardStream, cancel, err = s.createRegionHeartbeatForwardStream(client)

Check warning on line 1099 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1099

Added line #L1099 was not covered by tests
if err != nil {
return err
Expand Down Expand Up @@ -1185,9 +1186,22 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc()

if s.IsServiceIndependent(utils.SchedulingServiceName) {
if forwardErrCh != nil {
select {
case err, ok := <-forwardErrCh:
if ok {
if cancel != nil {
cancel()

Check warning on line 1194 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1191-L1194

Added lines #L1191 - L1194 were not covered by tests
}
forwardSchedulingStream = nil
log.Error("meet error and need to re-establish the stream", zap.Error(err))

Check warning on line 1197 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1196-L1197

Added lines #L1196 - L1197 were not covered by tests
}
default:
}
}
forwardedSchedulingHost, ok := s.GetServicePrimaryAddr(stream.Context(), utils.SchedulingServiceName)
if !ok || len(forwardedSchedulingHost) == 0 {
log.Error("failed to find scheduling service primary address")
log.Debug("failed to find scheduling service primary address")
if cancel != nil {
cancel()

Check warning on line 1206 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1204-L1206

Added lines #L1204 - L1206 were not covered by tests
}
Expand All @@ -1209,8 +1223,8 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
continue

Check warning on line 1223 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1222-L1223

Added lines #L1222 - L1223 were not covered by tests
}
lastForwardedSchedulingHost = forwardedSchedulingHost
errCh = make(chan error, 1)
go forwardRegionHeartbeatToScheduling(forwardSchedulingStream, server, errCh)
forwardErrCh = make(chan error, 1)
go forwardRegionHeartbeatToScheduling(forwardSchedulingStream, server, forwardErrCh)
}
schedulingpbReq := &schedulingpb.RegionHeartbeatRequest{
Header: &schedulingpb.RequestHeader{
Expand All @@ -1237,11 +1251,13 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
}

select {
case err := <-errCh:
log.Error("failed to send response", zap.Error(err))
case err, ok := <-forwardErrCh:
if ok {
forwardSchedulingStream = nil
log.Error("failed to send response", zap.Error(err))

Check warning on line 1257 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1254-L1257

Added lines #L1254 - L1257 were not covered by tests
}
default:
}
continue
}
}
}
Expand Down

0 comments on commit f48ff19

Please sign in to comment.