Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: add follower option #7465

Merged
merged 4 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Client interface {
// client should retry later.
GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionFromMember gets a region from certain members.
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*Region, error)
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error)
// GetPrevRegion gets the previous region and its leader Peer of the region where the key is located.
GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
Expand All @@ -100,7 +100,7 @@ type Client interface {
// Limit limits the maximum number of regions returned.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*Region, error)
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -200,7 +200,8 @@ func WithSkipStoreLimit() RegionsOption {

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
needBuckets bool
allowFollowerHandle bool
}

// GetRegionOption configures GetRegionOp.
Expand All @@ -211,6 +212,11 @@ func WithBuckets() GetRegionOption {
return func(op *GetRegionOp) { op.needBuckets = true }
}

// WithAllowFollowerHandle means that client can send request to follower and let it handle this request.
func WithAllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }
}

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
var LeaderHealthCheckInterval = time.Second

Expand Down Expand Up @@ -701,6 +707,12 @@ func (c *client) UpdateOption(option DynamicOption, value interface{}) error {
return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool")
}
c.option.setEnableTSOFollowerProxy(enable)
case EnableFollowerHandle:
enable, ok := value.(bool)
if !ok {
return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool")
}
c.option.setEnableFollowerHandle(enable)
default:
return errors.New("[pd] unsupported client option")
}
Expand Down Expand Up @@ -952,7 +964,7 @@ func isNetworkError(code codes.Code) bool {
return code == codes.Unavailable || code == codes.DeadlineExceeded
}

func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*Region, error) {
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -1056,7 +1068,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
return handleRegionResponse(resp), nil
}

func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*Region, error) {
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down
17 changes: 17 additions & 0 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
maxInitClusterRetries = 100
defaultMaxTSOBatchWaitInterval time.Duration = 0
defaultEnableTSOFollowerProxy = false
defaultEnableFollowerHandle = false
)

// DynamicOption is used to distinguish the dynamic option type.
Expand All @@ -40,6 +41,8 @@ const (
// EnableTSOFollowerProxy is the TSO Follower Proxy option.
// It is stored as bool.
EnableTSOFollowerProxy
// EnableFollowerHandle is the follower handle option.
EnableFollowerHandle

dynamicOptionCount
)
Expand Down Expand Up @@ -72,6 +75,7 @@ func newOption() *option {

co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
co.dynamicOptions[EnableTSOFollowerProxy].Store(defaultEnableTSOFollowerProxy)
co.dynamicOptions[EnableFollowerHandle].Store(defaultEnableFollowerHandle)
return co
}

Expand All @@ -88,6 +92,19 @@ func (o *option) setMaxTSOBatchWaitInterval(interval time.Duration) error {
return nil
}

// setEnableFollowerHandle set the Follower Handle option.
func (o *option) setEnableFollowerHandle(enable bool) {
old := o.getEnableFollowerHandle()
if enable != old {
o.dynamicOptions[EnableFollowerHandle].Store(enable)
}
}

// getMaxTSOBatchWaitInterval gets the Follower Handle enable option.
func (o *option) getEnableFollowerHandle() bool {
return o.dynamicOptions[EnableFollowerHandle].Load().(bool)
}

// getMaxTSOBatchWaitInterval gets the max TSO batch wait interval option.
func (o *option) getMaxTSOBatchWaitInterval() time.Duration {
return o.dynamicOptions[MaxTSOBatchWaitInterval].Load().(time.Duration)
Expand Down
8 changes: 8 additions & 0 deletions client/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestDynamicOptionChange(t *testing.T) {
// Check the default value setting.
re.Equal(defaultMaxTSOBatchWaitInterval, o.getMaxTSOBatchWaitInterval())
re.Equal(defaultEnableTSOFollowerProxy, o.getEnableTSOFollowerProxy())
re.Equal(defaultEnableFollowerHandle, o.getEnableFollowerHandle())

// Check the invalid value setting.
re.NotNil(o.setMaxTSOBatchWaitInterval(time.Second))
Expand Down Expand Up @@ -55,4 +56,11 @@ func TestDynamicOptionChange(t *testing.T) {
close(o.enableTSOFollowerProxyCh)
// Setting the same value should not notify the channel.
o.setEnableTSOFollowerProxy(expectBool)

expectBool = true
o.setEnableFollowerHandle(expectBool)
re.Equal(expectBool, o.getEnableFollowerHandle())
expectBool = false
o.setEnableFollowerHandle(expectBool)
re.Equal(expectBool, o.getEnableFollowerHandle())
}
Loading