Skip to content

Commit

Permalink
Introduce the HTTP client
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Nov 6, 2023
1 parent ab8bf7b commit acc936a
Show file tree
Hide file tree
Showing 4 changed files with 507 additions and 2 deletions.
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type GlobalConfigItem struct {
PayLoad []byte
}

// Client is a PD (Placement Driver) client.
// Client is a PD (Placement Driver) RPC client.
// It should not be used after calling Close().
type Client interface {
// GetClusterID gets the cluster ID from PD.
Expand Down Expand Up @@ -1062,7 +1062,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int)
defer span.Finish()
}
start := time.Now()
defer cmdDurationScanRegions.Observe(time.Since(start).Seconds())
defer func() { cmdDurationScanRegions.Observe(time.Since(start).Seconds()) }()

var cancel context.CancelFunc
scanCtx := ctx
Expand Down
54 changes: 54 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

import (
"fmt"
"net/url"
)

// The following constants are the paths of PD HTTP APIs.
const (
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
Regions = "/pd/api/v1/regions"
regionByID = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
regionsByKey = "/pd/api/v1/regions/key"
regionsByStoreID = "/pd/api/v1/regions/store"
Stores = "/pd/api/v1/stores"
MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts"
)

// RegionByID returns the path of PD HTTP API to get region by ID.
func RegionByID(regionID uint64) string {
return fmt.Sprintf("%s/%d", regionByID, regionID)
}

// RegionByKey returns the path of PD HTTP API to get region by key.
func RegionByKey(key []byte) string {
return fmt.Sprintf("%s/%s", regionByKey, url.QueryEscape(string(key)))
}

// RegionsByKey returns the path of PD HTTP API to scan regions with given start key, end key and limit parameters.
func RegionsByKey(startKey, endKey []byte, limit int) string {
return fmt.Sprintf("%s?start_key=%s&end_key=%s&limit=%d",
regionsByKey, url.QueryEscape(string(startKey)), url.QueryEscape(string(endKey)), limit)
}

// RegionsByStoreID returns the path of PD HTTP API to get regions by store ID.
func RegionsByStoreID(storeID uint64) string {
return fmt.Sprintf("%s/%d", regionsByStoreID, storeID)
}
276 changes: 276 additions & 0 deletions client/http/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
)

const defaultTimeout = 30 * time.Second

// HTTPClient is a PD (Placement Driver) HTTP client.
type HTTPClient interface {
GetRegionByID(context.Context, uint64) (*RegionInfo, error)
GetRegionByKey(context.Context, []byte) (*RegionInfo, error)
GetRegions(context.Context) (*RegionsInfo, error)
GetRegionsByKey(context.Context, []byte, []byte, int) (*RegionsInfo, error)
GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error)
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error)
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetStores(context.Context) (*StoresInfo, error)
GetMinResolvedTSByStoresIDs(context.Context, []string) (uint64, map[uint64]uint64, error)
Close()
}

var _ HTTPClient = (*httpClient)(nil)

type httpClient struct {
pdAddrs []string
tlsConf *tls.Config
cli *http.Client
}

// HTTPClientOption configures the HTTP client.
type HTTPClientOption func(hc *httpClient)

// WithHTTPClient configures the client with the given initialized HTTP client.
func WithHTTPClient(cli *http.Client) HTTPClientOption {
return func(hc *httpClient) {
hc.cli = cli
}
}

// WithTLSConfig configures the client with the given TLS config.
// This option won't work if the client is configured with WithHTTPClient.
func WithTLSConfig(tlsConf *tls.Config) HTTPClientOption {
return func(hc *httpClient) {
hc.tlsConf = tlsConf
}
}

// NewHTTPClient creates a PD HTTP client with the given PD addresses and TLS config.
func NewHTTPClient(
pdAddrs []string,
opts ...HTTPClientOption,
) HTTPClient {
hc := &httpClient{}
// Apply the options first.
for _, opt := range opts {
opt(hc)
}
// Normalize the addresses with correct scheme prefix.
for i, addr := range pdAddrs {
if !strings.HasPrefix(addr, "http") {
if hc.tlsConf != nil {
addr = "https://" + addr
} else {
addr = "http://" + addr
}
pdAddrs[i] = addr
}
}
hc.pdAddrs = pdAddrs
// Init the HTTP client.
if hc.cli != nil {
cli := &http.Client{Timeout: defaultTimeout}
if hc.tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = hc.tlsConf
cli.Transport = transport
}
}

return hc
}

// Close closes the HTTP client.
func (hc *httpClient) Close() {
if hc.cli != nil {
hc.cli.CloseIdleConnections()
}
log.Info("[pd] http client closed")
}

func (hc *httpClient) pdAddr() string {
// TODO: support the customized PD address selection strategy.
return hc.pdAddrs[0]
}

func (hc *httpClient) request(
ctx context.Context,
name, uri string,
res interface{},
) error {
reqURL := fmt.Sprintf("%s%s", hc.pdAddr(), uri)
logFields := []zap.Field{
zap.String("name", name),
zap.String("url", reqURL),
}
log.Debug("[pd] request the http url", logFields...)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
if err != nil {
log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...)
return errors.Trace(err)
}
// TODO: integrate the metrics.
resp, err := hc.cli.Do(req)
if err != nil {
log.Error("[pd] do http request failed", append(logFields, zap.Error(err))...)
return errors.Trace(err)
}
defer func() {
err = resp.Body.Close()
if err != nil {
log.Warn("[pd] close http response body failed", append(logFields, zap.Error(err))...)
}
}()

if resp.StatusCode != http.StatusOK {
logFields = append(logFields, zap.String("status", resp.Status))

bs, readErr := io.ReadAll(resp.Body)
if readErr != nil {
logFields = append(logFields, zap.NamedError("read-body-error", err))
} else {
logFields = append(logFields, zap.ByteString("body", bs))
}

log.Error("[pd] request failed with a non-200 status", logFields...)
return errors.Errorf("request pd http api failed with status: '%s'", resp.Status)
}

err = json.NewDecoder(resp.Body).Decode(res)
if err != nil {
return errors.Trace(err)
}
return nil
}

// GetRegionByID gets the region info by ID.
func (hc *httpClient) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) {
var region RegionInfo
err := hc.request(ctx, "GetRegionByID", RegionByID(regionID), &region)
if err != nil {
return nil, err
}
return &region, nil
}

// GetRegionByKey gets the region info by key.
func (hc *httpClient) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, error) {
var region RegionInfo
err := hc.request(ctx, "GetRegionByKey", RegionByKey(key), &region)
if err != nil {
return nil, err
}
return &region, nil
}

// GetRegions gets the regions info.
func (hc *httpClient) GetRegions(ctx context.Context) (*RegionsInfo, error) {
var regions RegionsInfo
err := hc.request(ctx, "GetRegions", Regions, &regions)
if err != nil {
return nil, err
}
return &regions, nil
}

// GetRegionsByKey gets the regions info by key range. If the limit is -1, it will return all regions within the range.
func (hc *httpClient) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) {
var regions RegionsInfo
err := hc.request(ctx, "GetRegionsByKey", RegionsByKey(startKey, endKey, limit), &regions)
if err != nil {
return nil, err
}
return &regions, nil
}

// GetRegionsByStoreID gets the regions info by store ID.
func (hc *httpClient) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*RegionsInfo, error) {
var regions RegionsInfo
err := hc.request(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), &regions)
if err != nil {
return nil, err
}
return &regions, nil
}

// GetHotReadRegions gets the hot read region statistics info.
func (hc *httpClient) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotReadRegions StoreHotPeersInfos
err := hc.request(ctx, "GetHotReadRegions", HotRead, &hotReadRegions)
if err != nil {
return nil, err
}
return &hotReadRegions, nil
}

// GetHotWriteRegions gets the hot write region statistics info.
func (hc *httpClient) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotWriteRegions StoreHotPeersInfos
err := hc.request(ctx, "GetHotWriteRegions", HotWrite, &hotWriteRegions)
if err != nil {
return nil, err
}
return &hotWriteRegions, nil
}

// GetStores gets the stores info.
func (hc *httpClient) GetStores(ctx context.Context) (*StoresInfo, error) {
var stores StoresInfo
err := hc.request(ctx, "GetStores", Stores, &stores)
if err != nil {
return nil, err
}
return &stores, nil
}

// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs.
func (hc *httpClient) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []string) (uint64, map[uint64]uint64, error) {
uri := MinResolvedTSPrefix
// scope is an optional parameter, it can be `cluster` or specified store IDs.
// - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil.
// - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled.
// - When scope given a list of stores, min_resolved_ts will be provided for each store
// and the scope-specific min_resolved_ts will be returned.
if len(storeIDs) != 0 {
uri = fmt.Sprintf("%s?scope=%s", uri, strings.Join(storeIDs, ","))
}
resp := struct {
MinResolvedTS uint64 `json:"min_resolved_ts"`
IsRealTime bool `json:"is_real_time,omitempty"`
StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"`
}{}
err := hc.request(ctx, "GetMinResolvedTSByStoresIDs", uri, &resp)
if err != nil {
return 0, nil, err
}
if !resp.IsRealTime {
return 0, nil, errors.Trace(errors.New("min resolved ts is not enabled"))
}
return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil
}
Loading

0 comments on commit acc936a

Please sign in to comment.