diff --git a/client/client.go b/client/client.go index 067872d2d391..56923b697e27 100644 --- a/client/client.go +++ b/client/client.go @@ -74,7 +74,7 @@ type GlobalConfigItem struct { PayLoad []byte } -// Client is a PD (Placement Driver) client. +// Client is a PD (Placement Driver) RPC client. // It should not be used after calling Close(). type Client interface { // GetClusterID gets the cluster ID from PD. @@ -1062,7 +1062,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) defer span.Finish() } start := time.Now() - defer cmdDurationScanRegions.Observe(time.Since(start).Seconds()) + defer func() { cmdDurationScanRegions.Observe(time.Since(start).Seconds()) }() var cancel context.CancelFunc scanCtx := ctx diff --git a/client/http/api.go b/client/http/api.go new file mode 100644 index 000000000000..5326919561d1 --- /dev/null +++ b/client/http/api.go @@ -0,0 +1,54 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "fmt" + "net/url" +) + +// The following constants are the paths of PD HTTP APIs. +const ( + HotRead = "/pd/api/v1/hotspot/regions/read" + HotWrite = "/pd/api/v1/hotspot/regions/write" + Regions = "/pd/api/v1/regions" + regionByID = "/pd/api/v1/region/id" + regionByKey = "/pd/api/v1/region/key" + regionsByKey = "/pd/api/v1/regions/key" + regionsByStoreID = "/pd/api/v1/regions/store" + Stores = "/pd/api/v1/stores" + MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts" +) + +// RegionByID returns the path of PD HTTP API to get region by ID. +func RegionByID(regionID uint64) string { + return fmt.Sprintf("%s/%d", regionByID, regionID) +} + +// RegionByKey returns the path of PD HTTP API to get region by key. +func RegionByKey(key []byte) string { + return fmt.Sprintf("%s/%s", regionByKey, url.QueryEscape(string(key))) +} + +// RegionsByKey returns the path of PD HTTP API to scan regions with given start key, end key and limit parameters. +func RegionsByKey(startKey, endKey []byte, limit int) string { + return fmt.Sprintf("%s?start_key=%s&end_key=%s&limit=%d", + regionsByKey, url.QueryEscape(string(startKey)), url.QueryEscape(string(endKey)), limit) +} + +// RegionsByStoreID returns the path of PD HTTP API to get regions by store ID. +func RegionsByStoreID(storeID uint64) string { + return fmt.Sprintf("%s/%d", regionsByStoreID, storeID) +} diff --git a/client/http/client.go b/client/http/client.go new file mode 100644 index 000000000000..906ef9f1d6b1 --- /dev/null +++ b/client/http/client.go @@ -0,0 +1,276 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +const defaultTimeout = 30 * time.Second + +// HTTPClient is a PD (Placement Driver) HTTP client. +type HTTPClient interface { + GetRegionByID(context.Context, uint64) (*RegionInfo, error) + GetRegionByKey(context.Context, []byte) (*RegionInfo, error) + GetRegions(context.Context) (*RegionsInfo, error) + GetRegionsByKey(context.Context, []byte, []byte, int) (*RegionsInfo, error) + GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error) + GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error) + GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error) + GetStores(context.Context) (*StoresInfo, error) + GetMinResolvedTSByStoresIDs(context.Context, []string) (uint64, map[uint64]uint64, error) + Close() +} + +var _ HTTPClient = (*httpClient)(nil) + +type httpClient struct { + pdAddrs []string + tlsConf *tls.Config + cli *http.Client +} + +// HTTPClientOption configures the HTTP client. +type HTTPClientOption func(hc *httpClient) + +// WithHTTPClient configures the client with the given initialized HTTP client. +func WithHTTPClient(cli *http.Client) HTTPClientOption { + return func(hc *httpClient) { + hc.cli = cli + } +} + +// WithTLSConfig configures the client with the given TLS config. +// This option won't work if the client is configured with WithHTTPClient. +func WithTLSConfig(tlsConf *tls.Config) HTTPClientOption { + return func(hc *httpClient) { + hc.tlsConf = tlsConf + } +} + +// NewHTTPClient creates a PD HTTP client with the given PD addresses and TLS config. +func NewHTTPClient( + pdAddrs []string, + opts ...HTTPClientOption, +) HTTPClient { + hc := &httpClient{} + // Apply the options first. + for _, opt := range opts { + opt(hc) + } + // Normalize the addresses with correct scheme prefix. + for i, addr := range pdAddrs { + if !strings.HasPrefix(addr, "http") { + if hc.tlsConf != nil { + addr = "https://" + addr + } else { + addr = "http://" + addr + } + pdAddrs[i] = addr + } + } + hc.pdAddrs = pdAddrs + // Init the HTTP client. + if hc.cli != nil { + cli := &http.Client{Timeout: defaultTimeout} + if hc.tlsConf != nil { + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.TLSClientConfig = hc.tlsConf + cli.Transport = transport + } + } + + return hc +} + +// Close closes the HTTP client. +func (hc *httpClient) Close() { + if hc.cli != nil { + hc.cli.CloseIdleConnections() + } + log.Info("[pd] http client closed") +} + +func (hc *httpClient) pdAddr() string { + // TODO: support the customized PD address selection strategy. + return hc.pdAddrs[0] +} + +func (hc *httpClient) request( + ctx context.Context, + name, uri string, + res interface{}, +) error { + reqURL := fmt.Sprintf("%s%s", hc.pdAddr(), uri) + logFields := []zap.Field{ + zap.String("name", name), + zap.String("url", reqURL), + } + log.Debug("[pd] request the http url", logFields...) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) + if err != nil { + log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...) + return errors.Trace(err) + } + // TODO: integrate the metrics. + resp, err := hc.cli.Do(req) + if err != nil { + log.Error("[pd] do http request failed", append(logFields, zap.Error(err))...) + return errors.Trace(err) + } + defer func() { + err = resp.Body.Close() + if err != nil { + log.Warn("[pd] close http response body failed", append(logFields, zap.Error(err))...) + } + }() + + if resp.StatusCode != http.StatusOK { + logFields = append(logFields, zap.String("status", resp.Status)) + + bs, readErr := io.ReadAll(resp.Body) + if readErr != nil { + logFields = append(logFields, zap.NamedError("read-body-error", err)) + } else { + logFields = append(logFields, zap.ByteString("body", bs)) + } + + log.Error("[pd] request failed with a non-200 status", logFields...) + return errors.Errorf("request pd http api failed with status: '%s'", resp.Status) + } + + err = json.NewDecoder(resp.Body).Decode(res) + if err != nil { + return errors.Trace(err) + } + return nil +} + +// GetRegionByID gets the region info by ID. +func (hc *httpClient) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) { + var region RegionInfo + err := hc.request(ctx, "GetRegionByID", RegionByID(regionID), ®ion) + if err != nil { + return nil, err + } + return ®ion, nil +} + +// GetRegionByKey gets the region info by key. +func (hc *httpClient) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, error) { + var region RegionInfo + err := hc.request(ctx, "GetRegionByKey", RegionByKey(key), ®ion) + if err != nil { + return nil, err + } + return ®ion, nil +} + +// GetRegions gets the regions info. +func (hc *httpClient) GetRegions(ctx context.Context) (*RegionsInfo, error) { + var regions RegionsInfo + err := hc.request(ctx, "GetRegions", Regions, ®ions) + if err != nil { + return nil, err + } + return ®ions, nil +} + +// GetRegionsByKey gets the regions info by key range. If the limit is -1, it will return all regions within the range. +func (hc *httpClient) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) { + var regions RegionsInfo + err := hc.request(ctx, "GetRegionsByKey", RegionsByKey(startKey, endKey, limit), ®ions) + if err != nil { + return nil, err + } + return ®ions, nil +} + +// GetRegionsByStoreID gets the regions info by store ID. +func (hc *httpClient) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*RegionsInfo, error) { + var regions RegionsInfo + err := hc.request(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), ®ions) + if err != nil { + return nil, err + } + return ®ions, nil +} + +// GetHotReadRegions gets the hot read region statistics info. +func (hc *httpClient) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) { + var hotReadRegions StoreHotPeersInfos + err := hc.request(ctx, "GetHotReadRegions", HotRead, &hotReadRegions) + if err != nil { + return nil, err + } + return &hotReadRegions, nil +} + +// GetHotWriteRegions gets the hot write region statistics info. +func (hc *httpClient) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, error) { + var hotWriteRegions StoreHotPeersInfos + err := hc.request(ctx, "GetHotWriteRegions", HotWrite, &hotWriteRegions) + if err != nil { + return nil, err + } + return &hotWriteRegions, nil +} + +// GetStores gets the stores info. +func (hc *httpClient) GetStores(ctx context.Context) (*StoresInfo, error) { + var stores StoresInfo + err := hc.request(ctx, "GetStores", Stores, &stores) + if err != nil { + return nil, err + } + return &stores, nil +} + +// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs. +func (hc *httpClient) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []string) (uint64, map[uint64]uint64, error) { + uri := MinResolvedTSPrefix + // scope is an optional parameter, it can be `cluster` or specified store IDs. + // - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil. + // - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled. + // - When scope given a list of stores, min_resolved_ts will be provided for each store + // and the scope-specific min_resolved_ts will be returned. + if len(storeIDs) != 0 { + uri = fmt.Sprintf("%s?scope=%s", uri, strings.Join(storeIDs, ",")) + } + resp := struct { + MinResolvedTS uint64 `json:"min_resolved_ts"` + IsRealTime bool `json:"is_real_time,omitempty"` + StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"` + }{} + err := hc.request(ctx, "GetMinResolvedTSByStoresIDs", uri, &resp) + if err != nil { + return 0, nil, err + } + if !resp.IsRealTime { + return 0, nil, errors.Trace(errors.New("min resolved ts is not enabled")) + } + return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil +} diff --git a/client/http/types.go b/client/http/types.go new file mode 100644 index 000000000000..2d345084a921 --- /dev/null +++ b/client/http/types.go @@ -0,0 +1,175 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import "time" + +// NOTICE: the structures below are copied from the PD API definitions. +// Please make sure the consistency if any change happens to the PD API. + +// RegionInfo stores the information of one region. +type RegionInfo struct { + ID int64 `json:"id"` + StartKey string `json:"start_key"` + EndKey string `json:"end_key"` + Epoch RegionEpoch `json:"epoch"` + Peers []RegionPeer `json:"peers"` + Leader RegionPeer `json:"leader"` + DownPeers []RegionPeerStat `json:"down_peers"` + PendingPeers []RegionPeer `json:"pending_peers"` + WrittenBytes uint64 `json:"written_bytes"` + ReadBytes uint64 `json:"read_bytes"` + ApproximateSize int64 `json:"approximate_size"` + ApproximateKeys int64 `json:"approximate_keys"` + + ReplicationStatus *ReplicationStatus `json:"replication_status,omitempty"` +} + +func (r *RegionInfo) GetStartKey() string { return r.StartKey } +func (r *RegionInfo) GetEndKey() string { return r.EndKey } + +// RegionEpoch stores the information about its epoch. +type RegionEpoch struct { + ConfVer int64 `json:"conf_ver"` + Version int64 `json:"version"` +} + +// RegionPeer stores information of one peer. +type RegionPeer struct { + ID int64 `json:"id"` + StoreID int64 `json:"store_id"` + IsLearner bool `json:"is_learner"` +} + +// RegionPeerStat stores one field `DownSec` which indicates how long it's down than `RegionPeer`. +type RegionPeerStat struct { + Peer RegionPeer `json:"peer"` + DownSec int64 `json:"down_seconds"` +} + +// ReplicationStatus represents the replication mode status of the region. +type ReplicationStatus struct { + State string `json:"state"` + StateID int64 `json:"state_id"` +} + +// RegionsInfo stores the information of regions. +type RegionsInfo struct { + Count int64 `json:"count"` + Regions []RegionInfo `json:"regions"` +} + +// Merge merges two RegionsInfo together and returns a new one. +func (ri *RegionsInfo) Merge(other *RegionsInfo) *RegionsInfo { + newRegionsInfo := &RegionsInfo{ + Regions: make([]RegionInfo, 0, ri.Count+other.Count), + } + m := make(map[int64]RegionInfo, ri.Count+other.Count) + for _, region := range ri.Regions { + m[region.ID] = region + } + for _, region := range other.Regions { + m[region.ID] = region + } + for _, region := range m { + newRegionsInfo.Regions = append(newRegionsInfo.Regions, region) + } + newRegionsInfo.Count = int64(len(newRegionsInfo.Regions)) + return newRegionsInfo +} + +// StoreHotPeersInfos is used to get human-readable description for hot regions. +type StoreHotPeersInfos struct { + AsPeer StoreHotPeersStat `json:"as_peer"` + AsLeader StoreHotPeersStat `json:"as_leader"` +} + +// StoreHotPeersStat is used to record the hot region statistics group by store. +type StoreHotPeersStat map[uint64]*HotPeersStat + +// HotPeersStat records all hot regions statistics +type HotPeersStat struct { + StoreByteRate float64 `json:"store_bytes"` + StoreKeyRate float64 `json:"store_keys"` + StoreQueryRate float64 `json:"store_query"` + TotalBytesRate float64 `json:"total_flow_bytes"` + TotalKeysRate float64 `json:"total_flow_keys"` + TotalQueryRate float64 `json:"total_flow_query"` + Count int `json:"regions_count"` + Stats []HotPeerStatShow `json:"statistics"` +} + +// HotPeerStatShow records the hot region statistics for output +type HotPeerStatShow struct { + StoreID uint64 `json:"store_id"` + Stores []uint64 `json:"stores"` + IsLeader bool `json:"is_leader"` + IsLearner bool `json:"is_learner"` + RegionID uint64 `json:"region_id"` + HotDegree int `json:"hot_degree"` + ByteRate float64 `json:"flow_bytes"` + KeyRate float64 `json:"flow_keys"` + QueryRate float64 `json:"flow_query"` + AntiCount int `json:"anti_count"` + LastUpdateTime time.Time `json:"last_update_time,omitempty"` +} + +// StoresInfo represents the information of all TiKV/TiFlash stores. +type StoresInfo struct { + Count int `json:"count"` + Stores []StoreInfo `json:"stores"` +} + +// StoreInfo represents the information of one TiKV/TiFlash store. +type StoreInfo struct { + Store MetaStore `json:"store"` + Status StoreStatus `json:"status"` +} + +// MetaStore represents the meta information of one store. +type MetaStore struct { + ID int64 `json:"id"` + Address string `json:"address"` + State int64 `json:"state"` + StateName string `json:"state_name"` + Version string `json:"version"` + Labels []StoreLabel `json:"labels"` + StatusAddress string `json:"status_address"` + GitHash string `json:"git_hash"` + StartTimestamp int64 `json:"start_timestamp"` +} + +// StoreLabel stores the information of one store label. +type StoreLabel struct { + Key string `json:"key"` + Value string `json:"value"` +} + +// StoreStatus stores the detail information of one store. +type StoreStatus struct { + Capacity string `json:"capacity"` + Available string `json:"available"` + LeaderCount int64 `json:"leader_count"` + LeaderWeight float64 `json:"leader_weight"` + LeaderScore float64 `json:"leader_score"` + LeaderSize int64 `json:"leader_size"` + RegionCount int64 `json:"region_count"` + RegionWeight float64 `json:"region_weight"` + RegionScore float64 `json:"region_score"` + RegionSize int64 `json:"region_size"` + StartTS time.Time `json:"start_ts"` + LastHeartbeatTS time.Time `json:"last_heartbeat_ts"` + Uptime string `json:"uptime"` +}