Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB committed Nov 24, 2023
1 parent 55be3ac commit c074c18
Show file tree
Hide file tree
Showing 20 changed files with 5,301 additions and 346 deletions.
61 changes: 30 additions & 31 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func WithBuckets() GetRegionOption {
return func(op *GetRegionOp) { op.needBuckets = true }
}

// AllowFollowerHandle
// AllowFollowerHandle means that client can send request to follower and let it handle this request.
func AllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }

Check warning on line 211 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L211

Added line #L211 was not covered by tests
}
Expand Down Expand Up @@ -711,7 +711,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.GetMembersRequest{Header: c.requestHeader()}
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()

Check warning on line 715 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L714-L715

Added lines #L714 - L715 were not covered by tests
if protoClient == nil {
cancel()
Expand All @@ -725,15 +725,14 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
return resp.GetMembers(), nil
}

func (c *client) getClientAndContext(ctx context.Context, kind ApiKind, allowFollower bool) (ServiceClient, context.Context) {
var serviceClient ServiceClient
// getClientAndContext returns serviceClient and ctx with the given parameters
func (c *client) getClientAndContext(ctx context.Context, kind apiKind, allowFollower bool) (ServiceClient, context.Context) {
if allowFollower {
serviceClient = c.pdSvcDiscovery.GetServiceClient(kind)
serviceClient := c.pdSvcDiscovery.GetServiceClient(kind)
return serviceClient, serviceClient.BuildGRPCContext(ctx, false)

Check warning on line 732 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L731-L732

Added lines #L731 - L732 were not covered by tests
} else {
serviceClient = c.pdSvcDiscovery.GetLeaderOrBackupServiceClient()
return serviceClient, serviceClient.BuildGRPCContext(ctx, true)
}
serviceClient := c.pdSvcDiscovery.GetLeaderOrBackupServiceClient()
return serviceClient, serviceClient.BuildGRPCContext(ctx, true)
}

func (c *client) GetTSAsync(ctx context.Context) TSFuture {
Expand Down Expand Up @@ -795,7 +794,7 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
}

// Call GetMinTS API to get the minimal TS from the API leader.
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
return 0, 0, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -901,7 +900,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
RegionKey: key,
NeedBuckets: options.needBuckets,
}
serviceClient, cctx := c.getClientAndContext(ctx, RegionAPIKind, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
serviceClient, cctx := c.getClientAndContext(ctx, regionAPIKind, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient

Check warning on line 906 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L905-L906

Added lines #L905 - L906 were not covered by tests
Expand All @@ -914,7 +913,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
cancel()
if serviceClient.RespToErr(resp.GetHeader().GetError(), err) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
serviceClient, cctx = c.getClientAndContext(ctx, RegionAPIKind, false)
serviceClient, cctx = c.getClientAndContext(ctx, regionAPIKind, false)
if serviceClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient

Check warning on line 919 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L915-L919

Added lines #L915 - L919 were not covered by tests
Expand Down Expand Up @@ -951,7 +950,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
RegionKey: key,
NeedBuckets: options.needBuckets,
}
serviceClient, cctx := c.getClientAndContext(ctx, RegionAPIKind, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
serviceClient, cctx := c.getClientAndContext(ctx, regionAPIKind, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -960,7 +959,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
cancel()
if serviceClient.RespToErr(resp.GetHeader().GetError(), err) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
serviceClient, cctx = c.getClientAndContext(ctx, RegionAPIKind, false)
serviceClient, cctx = c.getClientAndContext(ctx, regionAPIKind, false)
if serviceClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient

Check warning on line 965 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L961-L965

Added lines #L961 - L965 were not covered by tests
Expand Down Expand Up @@ -993,7 +992,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
RegionId: regionID,
NeedBuckets: options.needBuckets,
}
serviceClient, cctx := c.getClientAndContext(ctx, RegionAPIKind, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
serviceClient, cctx := c.getClientAndContext(ctx, regionAPIKind, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -1002,7 +1001,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
cancel()
if serviceClient.RespToErr(resp.GetHeader().GetError(), err) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
serviceClient, cctx = c.getClientAndContext(ctx, RegionAPIKind, false)
serviceClient, cctx = c.getClientAndContext(ctx, regionAPIKind, false)
if serviceClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient

Check warning on line 1007 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L1003-L1007

Added lines #L1003 - L1007 were not covered by tests
Expand Down Expand Up @@ -1041,7 +1040,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
EndKey: endKey,
Limit: int32(limit),
}
serviceClient, cctx := c.getClientAndContext(scanCtx, RegionAPIKind, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
serviceClient, cctx := c.getClientAndContext(scanCtx, regionAPIKind, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient
Expand All @@ -1050,7 +1049,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
cancel()
if serviceClient.RespToErr(resp.GetHeader().GetError(), err) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
serviceClient, cctx = c.getClientAndContext(ctx, RegionAPIKind, false)
serviceClient, cctx = c.getClientAndContext(ctx, regionAPIKind, false)
if serviceClient == nil {
cancel()
return nil, errs.ErrClientGetProtoClient

Check warning on line 1055 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L1051-L1055

Added lines #L1051 - L1055 were not covered by tests
Expand Down Expand Up @@ -1107,7 +1106,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
Header: c.requestHeader(),
StoreId: storeID,
}
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
cancel()
Expand Down Expand Up @@ -1152,7 +1151,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m
Header: c.requestHeader(),
ExcludeTombstoneStores: options.excludeTombstone,
}
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
cancel()
Expand Down Expand Up @@ -1180,7 +1179,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
Header: c.requestHeader(),
SafePoint: safePoint,
}
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
cancel()
Expand Down Expand Up @@ -1215,7 +1214,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
TTL: ttl,
SafePoint: safePoint,
}
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
cancel()
Expand Down Expand Up @@ -1248,7 +1247,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
RegionId: regionID,
Group: group,
}
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
cancel()
Expand Down Expand Up @@ -1293,7 +1292,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,
RetryLimit: options.retryLimit,
}

serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()

Check warning on line 1296 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L1295-L1296

Added lines #L1295 - L1296 were not covered by tests
if protoClient == nil {
cancel()
Expand All @@ -1316,7 +1315,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe
Header: c.requestHeader(),
RegionId: regionID,
}
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
cancel()
Expand Down Expand Up @@ -1344,7 +1343,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...R
SplitKeys: splitKeys,
RetryLimit: options.retryLimit,
}
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()

Check warning on line 1347 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L1346-L1347

Added lines #L1346 - L1347 were not covered by tests
if protoClient == nil {
cancel()
Expand Down Expand Up @@ -1375,7 +1374,7 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
SkipStoreLimit: options.skipStoreLimit,
}

serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
cancel()
Expand Down Expand Up @@ -1426,7 +1425,7 @@ func trimHTTPPrefix(str string) string {
func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
return nil, 0, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1458,7 +1457,7 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items
}
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
return errs.ErrClientGetProtoClient
Expand All @@ -1476,7 +1475,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
globalConfigWatcherCh := make(chan []GlobalConfigItem, 16)
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
Expand Down Expand Up @@ -1525,7 +1524,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
return 0, errs.ErrClientGetProtoClient
Expand All @@ -1546,7 +1545,7 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
return errs.ErrClientGetProtoClient
Expand Down
6 changes: 3 additions & 3 deletions client/gc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf
KeyspaceId: keyspaceID,
SafePoint: safePoint,
}
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
cancel()
Expand Down Expand Up @@ -79,7 +79,7 @@ func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32
SafePoint: safePoint,
Ttl: ttl,
}
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()

Check warning on line 83 in client/gc_client.go

View check run for this annotation

Codecov / codecov/patch

client/gc_client.go#L82-L83

Added lines #L82 - L83 were not covered by tests
if protoClient == nil {
cancel()
Expand All @@ -103,7 +103,7 @@ func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan [

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
serviceClient, ctx := c.getClientAndContext(ctx, DefaultAPIKind, false)
serviceClient, ctx := c.getClientAndContext(ctx, defaultAPIKind, false)
protoClient := serviceClient.GetPDClient()
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
Expand Down
4 changes: 2 additions & 2 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func BuildForwardContext(ctx context.Context, addr string) context.Context {
// BuildFollowerHandleContext creates a context with follower handle metadata information.
// It is used in client side.
// TODO: do we need the address?
func BuildFollowerHandleContext(ctx context.Context, addr string) context.Context {
md := metadata.Pairs(FollowerHandleMetadataKey, addr)
func BuildFollowerHandleContext(ctx context.Context) context.Context {
md := metadata.Pairs(FollowerHandleMetadataKey, "")
return metadata.NewOutgoingContext(ctx, md)

Check warning on line 83 in client/grpcutil/grpcutil.go

View check run for this annotation

Codecov / codecov/patch

client/grpcutil/grpcutil.go#L82-L83

Added lines #L82 - L83 were not covered by tests
}

Expand Down
2 changes: 1 addition & 1 deletion client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
// EnableTSOFollowerProxy is the TSO Follower Proxy option.
// It is stored as bool.
EnableTSOFollowerProxy

// EnableFollowerHandle is the follower handle option.
EnableFollowerHandle

dynamicOptionCount
Expand Down
Loading

0 comments on commit c074c18

Please sign in to comment.