From 642c989c1ac65137a3ecbfbb97093209ade91459 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 29 Nov 2024 17:43:17 +0800 Subject: [PATCH] address the comment Signed-off-by: Ryan Leung --- client/client.go | 62 +++++++++------------------ client/meta_storage_client.go | 7 +-- client/pkg/utils/grpcutil/grpcutil.go | 24 +---------- 3 files changed, 24 insertions(+), 69 deletions(-) diff --git a/client/client.go b/client/client.go index 3a8b43d0a19..5c05797eac7 100644 --- a/client/client.go +++ b/client/client.go @@ -38,8 +38,6 @@ import ( "github.com/tikv/pd/client/metrics" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/pkg/caller" - "github.com/tikv/pd/client/pkg/retry" - "github.com/tikv/pd/client/pkg/utils/grpcutil" "github.com/tikv/pd/client/pkg/utils/tlsutil" sd "github.com/tikv/pd/client/servicediscovery" "go.uber.org/zap" @@ -541,8 +539,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { if protoClient == nil { return nil, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - resp, err := protoClient.GetMembers(ctx, req, grpcutil.WithBackoffer(bo)) + resp, err := protoClient.GetMembers(ctx, req) if err = c.respForErr(metrics.CmdFailedDurationGetAllMembers, start, err, resp.GetHeader()); err != nil { return nil, err } @@ -614,10 +611,9 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e return 0, 0, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) resp, err := protoClient.GetMinTS(ctx, &pdpb.GetMinTSRequest{ Header: c.requestHeader(), - }, grpcutil.WithBackoffer(bo)) + }) if err != nil { if strings.Contains(err.Error(), "Unimplemented") { // If the method is not supported, we fallback to GetTS. @@ -721,8 +717,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio return nil, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req, grpcutil.WithBackoffer(bo)) + resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req) if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) { protoClient, cctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -761,8 +756,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req, grpcutil.WithBackoffer(bo)) + resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req) if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) { protoClient, cctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -802,8 +796,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req, grpcutil.WithBackoffer(bo)) + resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req) if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) { protoClient, cctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -848,9 +841,8 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) //nolint:staticcheck - resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req, grpcutil.WithBackoffer(bo)) + resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req) failpoint.Inject("responseNil", func() { resp = nil }) @@ -905,8 +897,7 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).BatchScanRegions(cctx, req, grpcutil.WithBackoffer(bo)) + resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).BatchScanRegions(cctx, req) failpoint.Inject("responseNil", func() { resp = nil }) @@ -990,8 +981,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e if protoClient == nil { return nil, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - resp, err := protoClient.GetStore(ctx, req, grpcutil.WithBackoffer(bo)) + resp, err := protoClient.GetStore(ctx, req) if err = c.respForErr(metrics.CmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil { return nil, err @@ -1035,8 +1025,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ( if protoClient == nil { return nil, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - resp, err := protoClient.GetAllStores(ctx, req, grpcutil.WithBackoffer(bo)) + resp, err := protoClient.GetAllStores(ctx, req) if err = c.respForErr(metrics.CmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil { return nil, err @@ -1063,8 +1052,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 if protoClient == nil { return 0, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - resp, err := protoClient.UpdateGCSafePoint(ctx, req, grpcutil.WithBackoffer(bo)) + resp, err := protoClient.UpdateGCSafePoint(ctx, req) if err = c.respForErr(metrics.CmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil { return 0, err @@ -1097,8 +1085,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, if protoClient == nil { return 0, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - resp, err := protoClient.UpdateServiceGCSafePoint(ctx, req, grpcutil.WithBackoffer(bo)) + resp, err := protoClient.UpdateServiceGCSafePoint(ctx, req) if err = c.respForErr(metrics.CmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil { return 0, err @@ -1130,8 +1117,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g if protoClient == nil { return errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - resp, err := protoClient.ScatterRegion(ctx, req, grpcutil.WithBackoffer(bo)) + resp, err := protoClient.ScatterRegion(ctx, req) if err != nil { return err } @@ -1175,8 +1161,7 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, if protoClient == nil { return nil, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - return protoClient.SplitAndScatterRegions(ctx, req, grpcutil.WithBackoffer(bo)) + return protoClient.SplitAndScatterRegions(ctx, req) } // GetOperator implements the RPCClient interface. @@ -1198,8 +1183,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe if protoClient == nil { return nil, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - return protoClient.GetOperator(ctx, req, grpcutil.WithBackoffer(bo)) + return protoClient.GetOperator(ctx, req) } // SplitRegions split regions by given split keys @@ -1225,8 +1209,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...o if protoClient == nil { return nil, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - return protoClient.SplitRegions(ctx, req, grpcutil.WithBackoffer(bo)) + return protoClient.SplitRegions(ctx, req) } func (c *client) requestHeader() *pdpb.RequestHeader { @@ -1258,8 +1241,7 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint if protoClient == nil { return nil, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - resp, err := protoClient.ScatterRegion(ctx, req, grpcutil.WithBackoffer(bo)) + resp, err := protoClient.ScatterRegion(ctx, req) if err != nil { return nil, err @@ -1278,8 +1260,7 @@ func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPat if protoClient == nil { return nil, 0, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - resp, err := protoClient.LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names, ConfigPath: configPath}, grpcutil.WithBackoffer(bo)) + resp, err := protoClient.LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names, ConfigPath: configPath}) if err != nil { return nil, 0, err } @@ -1311,8 +1292,7 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items if protoClient == nil { return errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) - _, err := protoClient.StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath}, grpcutil.WithBackoffer(bo)) + _, err := protoClient.StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath}) if err != nil { return err } @@ -1379,10 +1359,9 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { if protoClient == nil { return 0, errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) resp, err := protoClient.GetExternalTimestamp(ctx, &pdpb.GetExternalTimestampRequest{ Header: c.requestHeader(), - }, grpcutil.WithBackoffer(bo)) + }) if err != nil { return 0, err } @@ -1401,11 +1380,10 @@ func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) err if protoClient == nil { return errs.ErrClientGetProtoClient } - bo := retry.FromContext(ctx) resp, err := protoClient.SetExternalTimestamp(ctx, &pdpb.SetExternalTimestampRequest{ Header: c.requestHeader(), Timestamp: timestamp, - }, grpcutil.WithBackoffer(bo)) + }) if err != nil { return err } diff --git a/client/meta_storage_client.go b/client/meta_storage_client.go index 3e7700140d3..45bcb8d3d65 100644 --- a/client/meta_storage_client.go +++ b/client/meta_storage_client.go @@ -26,7 +26,6 @@ import ( "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/metrics" "github.com/tikv/pd/client/opt" - "github.com/tikv/pd/client/pkg/retry" "github.com/tikv/pd/client/pkg/utils/grpcutil" ) @@ -79,8 +78,7 @@ func (c *innerClient) Put(ctx context.Context, key, value []byte, opts ...opt.Me cancel() return nil, errs.ErrClientGetMetaStorageClient } - bo := retry.FromContext(ctx) - resp, err := cli.Put(ctx, req, grpcutil.WithBackoffer(bo)) + resp, err := cli.Put(ctx, req) cancel() if err = c.respForMetaStorageErr(metrics.CmdFailedDurationPut, start, err, resp.GetHeader()); err != nil { @@ -119,8 +117,7 @@ func (c *innerClient) Get(ctx context.Context, key []byte, opts ...opt.MetaStora cancel() return nil, errs.ErrClientGetMetaStorageClient } - bo := retry.FromContext(ctx) - resp, err := cli.Get(ctx, req, grpcutil.WithBackoffer(bo)) + resp, err := cli.Get(ctx, req) cancel() if err = c.respForMetaStorageErr(metrics.CmdFailedDurationGet, start, err, resp.GetHeader()); err != nil { diff --git a/client/pkg/utils/grpcutil/grpcutil.go b/client/pkg/utils/grpcutil/grpcutil.go index 94dad8748e4..29967263dd3 100644 --- a/client/pkg/utils/grpcutil/grpcutil.go +++ b/client/pkg/utils/grpcutil/grpcutil.go @@ -42,30 +42,10 @@ const ( FollowerHandleMetadataKey = "pd-allow-follower-handle" ) -// Add retry related CallOption -type retryCallOption struct { - grpc.EmptyCallOption - bo *retry.Backoffer -} - -// WithBackoffer returns a CallOption that adds a backoffer to the call. -func WithBackoffer(bo *retry.Backoffer) grpc.CallOption { - return &retryCallOption{bo: bo} -} - -func getBackofferFromCallOptions(opts []grpc.CallOption) *retry.Backoffer { - for _, opt := range opts { - if bo, ok := opt.(*retryCallOption); ok { - return bo.bo - } - } - return nil -} - // UnaryBackofferInterceptor is a gRPC interceptor that adds a backoffer to the call. func UnaryBackofferInterceptor() grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - bo := getBackofferFromCallOptions(opts) + bo := retry.FromContext(ctx) if bo == nil { return invoker(ctx, method, req, reply, cc, opts...) } @@ -113,7 +93,7 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause() } - // Add retry interceptor + // Add backoffer interceptor retryOpt := grpc.WithUnaryInterceptor(UnaryBackofferInterceptor()) // Add retry related connection parameters