Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/http: encapsulate rule-related PD HTTP interfaces #7397

Merged
merged 3 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
accelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
Expand All @@ -45,7 +45,10 @@ const (
PlacementRule = "/pd/api/v1/config/rule"
PlacementRules = "/pd/api/v1/config/rules"
placementRulesByGroup = "/pd/api/v1/config/rules/group"
PlacementRuleBundle = "/pd/api/v1/config/placement-rule"
RegionLabelRule = "/pd/api/v1/config/region-label/rule"
RegionLabelRules = "/pd/api/v1/config/region-label/rules"
RegionLabelRulesByIDs = "/pd/api/v1/config/region-label/rules/ids"
// Scheduler
Schedulers = "/pd/api/v1/schedulers"
scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-"
Expand Down Expand Up @@ -123,6 +126,16 @@ func PlacementRuleByGroupAndID(group, id string) string {
return fmt.Sprintf("%s/%s/%s", PlacementRule, group, id)
}

// PlacementRuleBundleByGroup returns the path of PD HTTP API to get placement rule bundle by group.
func PlacementRuleBundleByGroup(group string) string {
return fmt.Sprintf("%s/%s", PlacementRuleBundle, group)
}

// PlacementRuleBundleWithPartialParameter returns the path of PD HTTP API to get placement rule bundle with partial parameter.
func PlacementRuleBundleWithPartialParameter(partial bool) string {
return fmt.Sprintf("%s?partial=%t", PlacementRuleBundle, partial)
}

// SchedulerByName returns the scheduler API with the given scheduler name.
func SchedulerByName(name string) string {
return fmt.Sprintf("%s/%s", Schedulers, name)
Expand Down
147 changes: 131 additions & 16 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

// Client is a PD (Placement Driver) HTTP client.
type Client interface {
/* Meta-related interfaces */
GetRegionByID(context.Context, uint64) (*RegionInfo, error)
GetRegionByKey(context.Context, []byte) (*RegionInfo, error)
GetRegions(context.Context) (*RegionsInfo, error)
Expand All @@ -51,11 +52,24 @@
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetRegionStatusByKeyRange(context.Context, []byte, []byte) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
/* Rule-related interfaces */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have two groups of API for the placement rule, maybe we can reduce it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, after replacing the existing TiDB code, we can reduce the exposed interfaces here.

GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error)
GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error)
GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error)
SetPlacementRule(context.Context, *Rule) error
SetPlacementRuleBundles(context.Context, []*GroupBundle, bool) error
DeletePlacementRule(context.Context, string, string) error
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
GetAllRegionLabelRules(context.Context) ([]*LabelRule, error)
GetRegionLabelRulesByIDs(context.Context, []string) ([]*LabelRule, error)
SetRegionLabelRule(context.Context, *LabelRule) error
PatchRegionLabelRules(context.Context, *LabelRulePatch) error
/* Scheduling-related interfaces */
AccelerateSchedule(context.Context, []byte, []byte) error
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)

/* Client-related methods */
WithRespHandler(func(resp *http.Response) error) Client
Close()
}

Expand All @@ -66,6 +80,8 @@
tlsConf *tls.Config
cli *http.Client

respHandler func(resp *http.Response) error

requestCounter *prometheus.CounterVec
executionDuration *prometheus.HistogramVec
}
Expand Down Expand Up @@ -143,6 +159,13 @@
log.Info("[pd] http client closed")
}

// WithRespHandler sets the client with the given HTTP response handler.
// This allows the caller to customize how the response is handled, including error handling logic.
func (c *client) WithRespHandler(handler func(resp *http.Response) error) Client {
c.respHandler = handler
return c

Check warning on line 166 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L165-L166

Added lines #L165 - L166 were not covered by tests
}

func (c *client) reqCounter(name, status string) {
if c.requestCounter == nil {
return
Expand Down Expand Up @@ -204,6 +227,12 @@
}
c.execDuration(name, time.Since(start))
c.reqCounter(name, resp.Status)

// Give away the response handling to the caller if the handler is set.
if c.respHandler != nil {
return c.respHandler(resp)

Check warning on line 233 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L233

Added line #L233 was not covered by tests
}

defer func() {
err = resp.Body.Close()
if err != nil {
Expand Down Expand Up @@ -345,6 +374,30 @@
return &stores, nil
}

// GetAllPlacementRuleBundles gets all placement rules bundles.
func (c *client) GetAllPlacementRuleBundles(ctx context.Context) ([]*GroupBundle, error) {
var bundles []*GroupBundle
err := c.requestWithRetry(ctx,
"GetPlacementRuleBundle", PlacementRuleBundle,
http.MethodGet, nil, &bundles)
if err != nil {
return nil, err

Check warning on line 384 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L384

Added line #L384 was not covered by tests
}
return bundles, nil
}

// GetPlacementRuleBundleByGroup gets the placement rules bundle by group.
func (c *client) GetPlacementRuleBundleByGroup(ctx context.Context, group string) (*GroupBundle, error) {
var bundle GroupBundle
err := c.requestWithRetry(ctx,
"GetPlacementRuleBundleByGroup", PlacementRuleBundleByGroup(group),
http.MethodGet, nil, &bundle)
if err != nil {
return nil, err

Check warning on line 396 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L396

Added line #L396 was not covered by tests
}
return &bundle, nil
}

// GetPlacementRulesByGroup gets the placement rules by group.
func (c *client) GetPlacementRulesByGroup(ctx context.Context, group string) ([]*Rule, error) {
var rules []*Rule
Expand All @@ -368,13 +421,90 @@
http.MethodPost, bytes.NewBuffer(ruleJSON), nil)
}

// SetPlacementRuleBundles sets the placement rule bundles.
// If `partial` is false, all old configurations will be over-written and dropped.
func (c *client) SetPlacementRuleBundles(ctx context.Context, bundles []*GroupBundle, partial bool) error {
bundlesJSON, err := json.Marshal(bundles)
if err != nil {
return errors.Trace(err)

Check warning on line 429 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L429

Added line #L429 was not covered by tests
}
return c.requestWithRetry(ctx,
"SetPlacementRuleBundles", PlacementRuleBundleWithPartialParameter(partial),
http.MethodPost, bytes.NewBuffer(bundlesJSON), nil)
}

// DeletePlacementRule deletes the placement rule.
func (c *client) DeletePlacementRule(ctx context.Context, group, id string) error {
return c.requestWithRetry(ctx,
"DeletePlacementRule", PlacementRuleByGroupAndID(group, id),
http.MethodDelete, nil, nil)
}

// GetAllRegionLabelRules gets all region label rules.
func (c *client) GetAllRegionLabelRules(ctx context.Context) ([]*LabelRule, error) {
var labelRules []*LabelRule
err := c.requestWithRetry(ctx,
"GetAllRegionLabelRules", RegionLabelRules,
http.MethodGet, nil, &labelRules)
if err != nil {
return nil, err

Check warning on line 450 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L450

Added line #L450 was not covered by tests
}
return labelRules, nil
}

// GetRegionLabelRulesByIDs gets the region label rules by IDs.
func (c *client) GetRegionLabelRulesByIDs(ctx context.Context, ruleIDs []string) ([]*LabelRule, error) {
idsJSON, err := json.Marshal(ruleIDs)
if err != nil {
return nil, errors.Trace(err)

Check warning on line 459 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L459

Added line #L459 was not covered by tests
}
var labelRules []*LabelRule
err = c.requestWithRetry(ctx,
"GetRegionLabelRulesByIDs", RegionLabelRulesByIDs,
http.MethodGet, bytes.NewBuffer(idsJSON), &labelRules)
if err != nil {
return nil, err

Check warning on line 466 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L466

Added line #L466 was not covered by tests
}
return labelRules, nil
}

// SetRegionLabelRule sets the region label rule.
func (c *client) SetRegionLabelRule(ctx context.Context, labelRule *LabelRule) error {
labelRuleJSON, err := json.Marshal(labelRule)
if err != nil {
return errors.Trace(err)

Check warning on line 475 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L475

Added line #L475 was not covered by tests
}
return c.requestWithRetry(ctx,
"SetRegionLabelRule", RegionLabelRule,
http.MethodPost, bytes.NewBuffer(labelRuleJSON), nil)
}

// PatchRegionLabelRules patches the region label rules.
func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *LabelRulePatch) error {
labelRulePatchJSON, err := json.Marshal(labelRulePatch)
if err != nil {
return errors.Trace(err)

Check warning on line 486 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L486

Added line #L486 was not covered by tests
}
return c.requestWithRetry(ctx,
"PatchRegionLabelRules", RegionLabelRules,
http.MethodPatch, bytes.NewBuffer(labelRulePatchJSON), nil)
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)

Check warning on line 501 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L501

Added line #L501 was not covered by tests
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", AccelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}

// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs.
func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
uri := MinResolvedTSPrefix
Expand Down Expand Up @@ -406,18 +536,3 @@
}
return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", accelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}
31 changes: 31 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,34 @@ type Rule struct {
Version uint64 `json:"version,omitempty"` // only set at runtime, add 1 each time rules updated, begin from 0.
CreateTimestamp uint64 `json:"create_timestamp,omitempty"` // only set at runtime, recorded rule create timestamp
}

// GroupBundle represents a rule group and all rules belong to the group.
type GroupBundle struct {
ID string `json:"group_id"`
Index int `json:"group_index"`
Override bool `json:"group_override"`
Rules []*Rule `json:"rules"`
}

// RegionLabel is the label of a region.
type RegionLabel struct {
Key string `json:"key"`
Value string `json:"value"`
TTL string `json:"ttl,omitempty"`
StartAt string `json:"start_at,omitempty"`
}

// LabelRule is the rule to assign labels to a region.
type LabelRule struct {
ID string `json:"id"`
Index int `json:"index"`
Labels []RegionLabel `json:"labels"`
RuleType string `json:"rule_type"`
Data interface{} `json:"data"`
}

// LabelRulePatch is the patch to update the label rules.
type LabelRulePatch struct {
SetRules []*LabelRule `json:"sets"`
DeleteRules []string `json:"deletes"`
}
Loading
Loading