Skip to content

Commit

Permalink
Merge branch 'master' into disable-witness-scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jan 26, 2024
2 parents 8fc621a + 0d10882 commit 61a8601
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 14 deletions.
124 changes: 118 additions & 6 deletions tools/pd-api-bench/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

pd "github.com/tikv/pd/client"
pdHttp "github.com/tikv/pd/client/http"
"go.etcd.io/etcd/clientv3"
)

var (
Expand Down Expand Up @@ -111,6 +112,24 @@ func (c *baseCase) GetConfig() *Config {
return c.cfg.Clone()
}

// ETCDCase is the interface for all etcd api cases.
type ETCDCase interface {
Case
Init(context.Context, *clientv3.Client) error
Unary(context.Context, *clientv3.Client) error
}

// ETCDCraeteFn is function type to create ETCDCase.
type ETCDCraeteFn func() ETCDCase

// ETCDCaseFnMap is the map for all ETCD case creation function.
var ETCDCaseFnMap = map[string]ETCDCraeteFn{
"Get": newGetKV(),
"Put": newPutKV(),
"Delete": newDeleteKV(),
"Txn": newTxnKV(),
}

// GRPCCase is the interface for all gRPC cases.
type GRPCCase interface {
Case
Expand All @@ -130,9 +149,6 @@ var GRPCCaseFnMap = map[string]GRPCCraeteFn{
"Tso": newTso(),
}

// GRPCCaseMap is the map for all gRPC case creation function.
var GRPCCaseMap = map[string]GRPCCase{}

// HTTPCase is the interface for all HTTP cases.
type HTTPCase interface {
Case
Expand All @@ -148,9 +164,6 @@ var HTTPCaseFnMap = map[string]HTTPCraeteFn{
"GetMinResolvedTS": newMinResolvedTS(),
}

// HTTPCaseMap is the map for all HTTP cases.
var HTTPCaseMap = map[string]HTTPCase{}

type minResolvedTS struct {
*baseCase
}
Expand Down Expand Up @@ -366,3 +379,102 @@ func generateKeyForSimulator(id int, keyLen int) []byte {
copy(k, fmt.Sprintf("%010d", id))
return k
}

type getKV struct {
*baseCase
}

func newGetKV() func() ETCDCase {
return func() ETCDCase {
return &getKV{
baseCase: &baseCase{
name: "Get",
cfg: newConfig(),
},
}
}
}

func (c *getKV) Init(ctx context.Context, cli *clientv3.Client) error {
for i := 0; i < 100; i++ {
_, err := cli.Put(ctx, fmt.Sprintf("/test/0001/%4d", i), fmt.Sprintf("%4d", i))
if err != nil {
return err
}
}
return nil
}

func (c *getKV) Unary(ctx context.Context, cli *clientv3.Client) error {
_, err := cli.Get(ctx, "/test/0001", clientv3.WithPrefix())
return err
}

type putKV struct {
*baseCase
}

func newPutKV() func() ETCDCase {
return func() ETCDCase {
return &putKV{
baseCase: &baseCase{
name: "Put",
cfg: newConfig(),
},
}
}
}

func (c *putKV) Init(ctx context.Context, cli *clientv3.Client) error { return nil }

func (c *putKV) Unary(ctx context.Context, cli *clientv3.Client) error {
_, err := cli.Put(ctx, "/test/0001/0000", "test")
return err
}

type deleteKV struct {
*baseCase
}

func newDeleteKV() func() ETCDCase {
return func() ETCDCase {
return &deleteKV{
baseCase: &baseCase{
name: "Put",
cfg: newConfig(),
},
}
}
}

func (c *deleteKV) Init(ctx context.Context, cli *clientv3.Client) error { return nil }

func (c *deleteKV) Unary(ctx context.Context, cli *clientv3.Client) error {
_, err := cli.Delete(ctx, "/test/0001/0000")
return err
}

type txnKV struct {
*baseCase
}

func newTxnKV() func() ETCDCase {
return func() ETCDCase {
return &txnKV{
baseCase: &baseCase{
name: "Put",
cfg: newConfig(),
},
}
}
}

func (c *txnKV) Init(ctx context.Context, cli *clientv3.Client) error { return nil }

func (c *txnKV) Unary(ctx context.Context, cli *clientv3.Client) error {
txn := cli.Txn(ctx)
txn = txn.If(clientv3.Compare(clientv3.Value("/test/0001/0000"), "=", "test"))
txn = txn.Then(clientv3.OpPut("/test/0001/0000", "test2"))
_, err := txn.Commit()
return err
}
122 changes: 120 additions & 2 deletions tools/pd-api-bench/cases/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
pd "github.com/tikv/pd/client"
pdHttp "github.com/tikv/pd/client/http"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

Expand All @@ -34,21 +35,25 @@ type Coordinator struct {

httpClients []pdHttp.Client
gRPCClients []pd.Client
etcdClients []*clientv3.Client

http map[string]*httpController
grpc map[string]*gRPCController
etcd map[string]*etcdController

mu sync.RWMutex
}

// NewCoordinator returns a new coordinator.
func NewCoordinator(ctx context.Context, httpClients []pdHttp.Client, gRPCClients []pd.Client) *Coordinator {
func NewCoordinator(ctx context.Context, httpClients []pdHttp.Client, gRPCClients []pd.Client, etcdClients []*clientv3.Client) *Coordinator {
return &Coordinator{
ctx: ctx,
httpClients: httpClients,
gRPCClients: gRPCClients,
etcdClients: etcdClients,
http: make(map[string]*httpController),
grpc: make(map[string]*gRPCController),
etcd: make(map[string]*etcdController),
}
}

Expand All @@ -72,6 +77,16 @@ func (c *Coordinator) GetGRPCCase(name string) (*Config, error) {
return nil, errors.Errorf("case %v does not exist.", name)
}

// GetETCDCase returns the etcd case config.
func (c *Coordinator) GetETCDCase(name string) (*Config, error) {
c.mu.RLock()
defer c.mu.RUnlock()
if controller, ok := c.etcd[name]; ok {
return controller.GetConfig(), nil
}
return nil, errors.Errorf("case %v does not exist.", name)
}

// GetAllHTTPCases returns the all HTTP case configs.
func (c *Coordinator) GetAllHTTPCases() map[string]*Config {
c.mu.RLock()
Expand All @@ -94,6 +109,17 @@ func (c *Coordinator) GetAllGRPCCases() map[string]*Config {
return ret
}

// GetAllETCDCases returns the all etcd case configs.
func (c *Coordinator) GetAllETCDCases() map[string]*Config {
c.mu.RLock()
defer c.mu.RUnlock()
ret := make(map[string]*Config)
for name, c := range c.etcd {
ret[name] = c.GetConfig()
}
return ret
}

// SetHTTPCase sets the config for the specific case.
func (c *Coordinator) SetHTTPCase(name string, cfg *Config) error {
c.mu.Lock()
Expand Down Expand Up @@ -133,7 +159,29 @@ func (c *Coordinator) SetGRPCCase(name string, cfg *Config) error {
}
controller.run()
} else {
return errors.Errorf("HTTP case %s not implemented", name)
return errors.Errorf("gRPC case %s not implemented", name)
}
return nil
}

// SetETCDCase sets the config for the specific case.
func (c *Coordinator) SetETCDCase(name string, cfg *Config) error {
c.mu.Lock()
defer c.mu.Unlock()
if fn, ok := ETCDCaseFnMap[name]; ok {
var controller *etcdController
if controller, ok = c.etcd[name]; !ok {
controller = newEtcdController(c.ctx, c.etcdClients, fn)
c.etcd[name] = controller
}
controller.stop()
controller.SetQPS(cfg.QPS)
if cfg.Burst > 0 {
controller.SetBurst(cfg.Burst)
}
controller.run()
} else {
return errors.Errorf("etcd case %s not implemented", name)
}
return nil
}
Expand Down Expand Up @@ -266,3 +314,73 @@ func (c *gRPCController) stop() {
c.cancel = nil
c.wg.Wait()
}

type etcdController struct {
ETCDCase
clients []*clientv3.Client
pctx context.Context

ctx context.Context
cancel context.CancelFunc

wg sync.WaitGroup
}

func newEtcdController(ctx context.Context, clis []*clientv3.Client, fn ETCDCraeteFn) *etcdController {
c := &etcdController{
pctx: ctx,
clients: clis,
ETCDCase: fn(),
}
return c
}

// run tries to run the gRPC api bench.
func (c *etcdController) run() {
if c.GetQPS() <= 0 || c.cancel != nil {
return
}
c.ctx, c.cancel = context.WithCancel(c.pctx)
qps := c.GetQPS()
burst := c.GetBurst()
cliNum := int64(len(c.clients))
tt := time.Duration(base/qps*burst*cliNum) * time.Microsecond
log.Info("begin to run etcd case", zap.String("case", c.Name()), zap.Int64("qps", qps), zap.Int64("burst", burst), zap.Duration("interval", tt))
err := c.Init(c.ctx, c.clients[0])
if err != nil {
log.Error("init error", zap.String("case", c.Name()), zap.Error(err))
return
}
for _, cli := range c.clients {
c.wg.Add(1)
go func(cli *clientv3.Client) {
defer c.wg.Done()
var ticker = time.NewTicker(tt)
defer ticker.Stop()
for {
select {
case <-ticker.C:
for i := int64(0); i < burst; i++ {
err := c.Unary(c.ctx, cli)
if err != nil {
log.Error("meet erorr when doing etcd request", zap.String("case", c.Name()), zap.Error(err))
}
}
case <-c.ctx.Done():
log.Info("Got signal to exit running etcd case")
return
}
}
}(cli)
}
}

// stop stops the etcd api bench.
func (c *etcdController) stop() {
if c.cancel == nil {
return
}
c.cancel()
c.cancel = nil
c.wg.Wait()
}
7 changes: 7 additions & 0 deletions tools/pd-api-bench/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Config struct {
// only for init
HTTP map[string]cases.Config `toml:"http" json:"http"`
GRPC map[string]cases.Config `toml:"grpc" json:"grpc"`
ETCD map[string]cases.Config `toml:"etcd" json:"etcd"`
}

// NewConfig return a set of settings.
Expand Down Expand Up @@ -108,6 +109,12 @@ func (c *Config) InitCoordinator(co *cases.Coordinator) {
log.Error("create gRPC case failed", zap.Error(err))
}
}
for name, cfg := range c.ETCD {
err := co.SetETCDCase(name, &cfg)
if err != nil {
log.Error("create etcd case failed", zap.Error(err))
}
}
}

// Adjust is used to adjust configurations
Expand Down
4 changes: 4 additions & 0 deletions tools/pd-api-bench/config/simconfig.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ pd = "127.0.0.1:2379"
burst = 1
[grpc.GetStores]
qps = 1000
burst = 1
[etcd]
[etcd.Get]
qps = 1
burst = 1
Loading

0 comments on commit 61a8601

Please sign in to comment.