Skip to content

Commit

Permalink
Merge branch 'master' into follower/add_sync_status
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Nov 30, 2023
2 parents 896abe1 + 862eee1 commit 422ab81
Show file tree
Hide file tree
Showing 51 changed files with 1,294 additions and 294 deletions.
156 changes: 81 additions & 75 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Client interface {
// client should retry later.
GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionFromMember gets a region from certain members.
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*Region, error)
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error)
// GetPrevRegion gets the previous region and its leader Peer of the region where the key is located.
GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
Expand All @@ -100,7 +100,7 @@ type Client interface {
// Limit limits the maximum number of regions returned.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*Region, error)
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -200,7 +200,8 @@ func WithSkipStoreLimit() RegionsOption {

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
needBuckets bool
allowFollowerHandle bool
}

// GetRegionOption configures GetRegionOp.
Expand All @@ -211,6 +212,11 @@ func WithBuckets() GetRegionOption {
return func(op *GetRegionOp) { op.needBuckets = true }
}

// WithAllowFollowerHandle means that client can send request to follower and let it handle this request.
func WithAllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }
}

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
var LeaderHealthCheckInterval = time.Second

Expand Down Expand Up @@ -701,6 +707,12 @@ func (c *client) UpdateOption(option DynamicOption, value interface{}) error {
return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool")
}
c.option.setEnableTSOFollowerProxy(enable)
case EnableFollowerHandle:
enable, ok := value.(bool)
if !ok {
return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool")
}
c.option.setEnableFollowerHandle(enable)
default:
return errors.New("[pd] unsupported client option")
}
Expand Down Expand Up @@ -732,16 +744,18 @@ func (c *client) checkLeaderHealth(ctx context.Context) {
if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil {
healthCli := healthpb.NewHealthClient(client)
resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
rpcErr, ok := status.FromError(err)
failpoint.Inject("unreachableNetwork1", func() {
resp = nil
err = status.New(codes.Unavailable, "unavailable").Err()
})
rpcErr, ok := status.FromError(err)
if (ok && isNetworkError(rpcErr.Code())) || resp.GetStatus() != healthpb.HealthCheckResponse_SERVING {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1))
} else {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(0))
}
} else {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1))
}
}

Expand All @@ -751,8 +765,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.GetMembersRequest{Header: c.requestHeader()}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -811,6 +824,17 @@ func (c *client) getClient() pdpb.PDClient {
return c.leaderClient()
}

func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) {
if c.option.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 {
backupClientConn, addr := c.backupClientConn()
if backupClientConn != nil {
log.Debug("[pd] use follower client", zap.String("addr", addr))
return pdpb.NewPDClient(backupClientConn), grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
}
}
return c.leaderClient(), ctx
}

func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}
Expand Down Expand Up @@ -915,44 +939,11 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
return r
}

func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)

options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
req := &pdpb.GetRegionRequest{
Header: c.requestHeader(),
RegionKey: key,
NeedBuckets: options.needBuckets,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetRegion(ctx, req)
cancel()

if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}

func isNetworkError(code codes.Code) bool {
return code == codes.Unavailable || code == codes.DeadlineExceeded
}

func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*Region, error) {
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -990,6 +981,38 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs
return handleRegionResponse(resp), nil
}

func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)

options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
req := &pdpb.GetRegionRequest{
Header: c.requestHeader(),
RegionKey: key,
NeedBuckets: options.needBuckets,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetRegion(ctx, req)
cancel()

if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}

func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
Expand All @@ -1008,8 +1031,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
RegionKey: key,
NeedBuckets: options.needBuckets,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1041,8 +1063,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
RegionId: regionID,
NeedBuckets: options.needBuckets,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -1056,7 +1077,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
return handleRegionResponse(resp), nil
}

func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*Region, error) {
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -1076,8 +1097,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int)
EndKey: endKey,
Limit: int32(limit),
}
scanCtx = grpcutil.BuildForwardContext(scanCtx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, scanCtx := c.getClientAndContext(scanCtx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1132,8 +1152,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
Header: c.requestHeader(),
StoreId: storeID,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1177,8 +1196,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
Header: c.requestHeader(),
ExcludeTombstoneStores: options.excludeTombstone,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -1205,8 +1223,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
Header: c.requestHeader(),
SafePoint: safePoint,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1240,8 +1257,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
TTL: ttl,
SafePoint: safePoint,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1273,8 +1289,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
RegionId: regionID,
Group: group,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1318,8 +1333,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,
RetryLimit: options.retryLimit,
}

ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -1341,8 +1355,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe
Header: c.requestHeader(),
RegionId: regionID,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -1369,8 +1382,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...R
SplitKeys: splitKeys,
RetryLimit: options.retryLimit,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1400,8 +1412,7 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
SkipStoreLimit: options.skipStoreLimit,
}

ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1451,8 +1462,7 @@ func trimHTTPPrefix(str string) string {
func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, 0, errs.ErrClientGetProtoClient
}
Expand Down Expand Up @@ -1483,8 +1493,7 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items
}
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
Expand All @@ -1501,8 +1510,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
Expand Down Expand Up @@ -1550,8 +1558,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return 0, errs.ErrClientGetProtoClient
}
Expand All @@ -1571,8 +1578,7 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
Expand Down
Loading

0 comments on commit 422ab81

Please sign in to comment.