Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: follower support to handle GetRegion and other region api #7432

Merged
merged 27 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7e725fa
add service client
CabinfeverB Dec 4, 2023
889c0eb
add service client
CabinfeverB Dec 4, 2023
6315793
address comment
CabinfeverB Dec 4, 2023
fa0f510
Merge remote-tracking branch 'upstream/master' into follower/refactor…
CabinfeverB Dec 4, 2023
6535c6e
address comment
CabinfeverB Dec 5, 2023
423717c
address comment
CabinfeverB Dec 5, 2023
013fbce
Merge branch 'master' into follower/service_clinet
CabinfeverB Dec 5, 2023
943331e
Merge branch 'follower/service_clinet' into follower/refactor_forward
CabinfeverB Dec 5, 2023
0891cf1
address comment
CabinfeverB Dec 5, 2023
9322e4e
Merge branch 'follower/service_clinet' into follower/refactor_forward
CabinfeverB Dec 5, 2023
739deba
impl follower region
CabinfeverB Dec 5, 2023
01a983b
impl follower region
CabinfeverB Dec 6, 2023
bf4c764
merge master
CabinfeverB Dec 22, 2023
87efe03
merge master
CabinfeverB Dec 22, 2023
eac3646
merge master
CabinfeverB Dec 22, 2023
ac675a9
merge master
CabinfeverB Dec 22, 2023
82657d4
use ServiceClient in pd discovery
CabinfeverB Dec 22, 2023
a2d7dfb
use ServiceClient in pd discovery
CabinfeverB Dec 22, 2023
b2c0b16
address comment
CabinfeverB Dec 25, 2023
d5af768
merge master
CabinfeverB Dec 26, 2023
5ef3716
merge master
CabinfeverB Dec 26, 2023
d5ca4c6
merge master
CabinfeverB Dec 26, 2023
fad91de
Merge branch 'master' into follower/refactor_forward
CabinfeverB Dec 28, 2023
c1878d2
Merge branch 'master' into follower/refactor_forward
CabinfeverB Dec 28, 2023
c742353
address comment
CabinfeverB Dec 28, 2023
4de4d48
Merge branch 'master' into follower/refactor_forward
ti-chi-bot[bot] Dec 29, 2023
ff657fd
Merge branch 'master' into follower/refactor_forward
ti-chi-bot[bot] Dec 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 66 additions & 21 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@
type client struct {
keyspaceID uint32
svrUrls []string
pdSvcDiscovery ServiceDiscovery
pdSvcDiscovery *pdServiceDiscovery
tokenDispatcher *tokenDispatcher

// For service mode switching.
Expand Down Expand Up @@ -503,7 +503,7 @@
return err
}
// c.keyspaceID is the source of truth for keyspace id.
c.pdSvcDiscovery.(*pdServiceDiscovery).SetKeyspaceID(c.keyspaceID)
c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)
return nil
}

Expand Down Expand Up @@ -733,6 +733,23 @@
return pdpb.NewPDClient(serviceClient.GetClientConn()), serviceClient.BuildGRPCTargetContext(ctx, true)
}

// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns
// follower pd client and the context which holds forward information.
func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower bool) (ServiceClient, context.Context) {
var serviceClient ServiceClient
if allowFollower {
serviceClient = c.pdSvcDiscovery.getServiceClientByKind(regionAPIKind)
if serviceClient != nil {
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
}
}
serviceClient = c.pdSvcDiscovery.GetServiceClient()
if serviceClient == nil {
return nil, ctx

Check warning on line 748 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L748

Added line #L748 was not covered by tests
}
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
}

func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}
Expand Down Expand Up @@ -885,6 +902,7 @@
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()

options := &GetRegionOp{}
for _, opt := range opts {
Expand All @@ -895,13 +913,18 @@
RegionKey: key,
NeedBuckets: options.needBuckets,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetRegion(ctx, req)
cancel()
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient

Check warning on line 924 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L924

Added line #L924 was not covered by tests
}
resp, err = protoClient.GetRegion(cctx, req)
}

if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand All @@ -917,6 +940,7 @@
start := time.Now()
defer func() { cmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()

options := &GetRegionOp{}
for _, opt := range opts {
Expand All @@ -927,13 +951,18 @@
RegionKey: key,
NeedBuckets: options.needBuckets,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetPrevRegion(ctx, req)
cancel()
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient

Check warning on line 962 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L960-L962

Added lines #L960 - L962 were not covered by tests
}
resp, err = protoClient.GetPrevRegion(cctx, req)

Check warning on line 964 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L964

Added line #L964 was not covered by tests
}

if err = c.respForErr(cmdFailDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand All @@ -949,6 +978,7 @@
start := time.Now()
defer func() { cmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()

options := &GetRegionOp{}
for _, opt := range opts {
Expand All @@ -959,13 +989,18 @@
RegionId: regionID,
NeedBuckets: options.needBuckets,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetRegionByID(ctx, req)
cancel()
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient

Check warning on line 1000 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L998-L1000

Added lines #L998 - L1000 were not covered by tests
}
resp, err = protoClient.GetRegionByID(cctx, req)

Check warning on line 1002 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L1002

Added line #L1002 was not covered by tests
}

if err = c.respForErr(cmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand All @@ -987,18 +1022,28 @@
scanCtx, cancel = context.WithTimeout(ctx, c.option.timeout)
defer cancel()
}
options := &GetRegionOp{}
for _, opt := range opts {
opt(options)

Check warning on line 1027 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L1027

Added line #L1027 was not covered by tests
}
req := &pdpb.ScanRegionsRequest{
Header: c.requestHeader(),
StartKey: key,
EndKey: endKey,
Limit: int32(limit),
}
protoClient, scanCtx := c.getClientAndContext(scanCtx)
if protoClient == nil {
cancel()
serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.ScanRegions(scanCtx, req)
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req)
if !serviceClient.IsConnectedToLeader() && err != nil || resp.Header.GetError() != nil {
protoClient, cctx := c.getClientAndContext(scanCtx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient

Check warning on line 1043 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L1041-L1043

Added lines #L1041 - L1043 were not covered by tests
}
resp, err = protoClient.ScanRegions(cctx, req)

Check warning on line 1045 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L1045

Added line #L1045 was not covered by tests
}

if err = c.respForErr(cmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type apiKind int

const (
forwardAPIKind apiKind = iota
regionAPIKind
apiKindCount
)

Expand Down Expand Up @@ -445,7 +446,7 @@ func newPDServiceDiscovery(
ctx: ctx,
cancel: cancel,
wg: wg,
apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn)},
apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn), newPDServiceBalancer(regionAPIErrorFn)},
serviceModeUpdateCb: serviceModeUpdateCb,
updateKeyspaceIDCb: updateKeyspaceIDCb,
keyspaceID: keyspaceID,
Expand Down Expand Up @@ -563,6 +564,7 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() {
}
}
}

func (c *pdServiceDiscovery) memberHealthCheckLoop() {
defer c.wg.Done()

Expand Down
15 changes: 14 additions & 1 deletion pkg/utils/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
const (
// ForwardMetadataKey is used to record the forwarded host of PD.
ForwardMetadataKey = "pd-forwarded-host"
// FollowerHandleMetadataKey is used to mark the permit of follower handle.
FollowerHandleMetadataKey = "pd-allow-follower-handle"
)

// TLSConfig is the configuration for supporting tls.
Expand Down Expand Up @@ -173,7 +175,7 @@
func GetForwardedHost(ctx context.Context) string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
log.Debug("failed to get forwarding metadata")
log.Debug("failed to get gRPC incoming metadata when getting forwarded host")
return ""
}
if t, ok := md[ForwardMetadataKey]; ok {
Expand All @@ -182,6 +184,17 @@
return ""
}

// IsFollowerHandleEnabled returns the follower host in metadata.
func IsFollowerHandleEnabled(ctx context.Context) bool {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
log.Debug("failed to get gRPC incoming metadata when checking follower handle is enabled")
return false

Check warning on line 192 in pkg/utils/grpcutil/grpcutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/grpcutil/grpcutil.go#L191-L192

Added lines #L191 - L192 were not covered by tests
}
_, ok = md[FollowerHandleMetadataKey]
return ok
}

func establish(ctx context.Context, addr string, tlsConfig *TLSConfig, do ...grpc.DialOption) (*grpc.ClientConn, error) {
tlsCfg, err := tlsConfig.ToTLSConfig()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,16 +384,16 @@ func (s *GrpcServer) getForwardedHost(ctx, streamCtx context.Context, serviceNam
return forwardedHost, nil
}

func (s *GrpcServer) isLocalRequest(forwardedHost string) bool {
func (s *GrpcServer) isLocalRequest(host string) bool {
failpoint.Inject("useForwardRequest", func() {
failpoint.Return(false)
})
if forwardedHost == "" {
if host == "" {
return true
}
memberAddrs := s.GetMember().Member().GetClientUrls()
for _, addr := range memberAddrs {
if addr == forwardedHost {
if addr == host {
return true
}
}
Expand Down
Loading
Loading