Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: refactor TSO config #5902

Merged
merged 4 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/server/config"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
Expand Down Expand Up @@ -250,12 +249,12 @@ func (m *Member) isSameLeader(leader *pdpb.Member) bool {
}

// MemberInfo initializes the member info.
func (m *Member) MemberInfo(cfg *config.Config, name string, rootPath string) {
func (m *Member) MemberInfo(advertiseClientUrls, advertisePeerUrls, name string, rootPath string) {
leader := &pdpb.Member{
Name: name,
MemberId: m.ID(),
ClientUrls: strings.Split(cfg.AdvertiseClientUrls, ","),
PeerUrls: strings.Split(cfg.AdvertisePeerUrls, ","),
ClientUrls: strings.Split(advertiseClientUrls, ","),
PeerUrls: strings.Split(advertisePeerUrls, ","),
}

data, err := leader.Marshal()
Expand Down
15 changes: 8 additions & 7 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ const (
leaderTickInterval = 50 * time.Millisecond
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
// The value should be the same as the variable defined in server's config.
defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond
)

var (
Expand Down Expand Up @@ -134,17 +132,20 @@ type AllocatorManager struct {
func NewAllocatorManager(
m *member.Member,
rootPath string,
cfg config,
enableLocalTSO bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing all tso configuration fields here seems to be a litter weird, but we can refine later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can define an interface later to avoid it.

saveInterval time.Duration,
updatePhysicalInterval time.Duration,
tlsConfig *grpcutil.TLSConfig,
maxResetTSGap func() time.Duration,
) *AllocatorManager {
allocatorManager := &AllocatorManager{
enableLocalTSO: cfg.IsLocalTSOEnabled(),
enableLocalTSO: enableLocalTSO,
member: m,
rootPath: rootPath,
saveInterval: cfg.GetTSOSaveInterval(),
updatePhysicalInterval: cfg.GetTSOUpdatePhysicalInterval(),
saveInterval: saveInterval,
updatePhysicalInterval: updatePhysicalInterval,
maxResetTSGap: maxResetTSGap,
securityConfig: cfg.GetTLSConfig(),
securityConfig: tlsConfig,
}
allocatorManager.mu.allocatorGroups = make(map[string]*allocatorGroup)
allocatorManager.mu.clusterDCLocations = make(map[string]*DCLocationInfo)
Expand Down
100 changes: 94 additions & 6 deletions pkg/tso/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,102 @@
package tso

import (
"flag"
"time"

"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/typeutil"
)

type config interface {
IsLocalTSOEnabled() bool
GetTSOSaveInterval() time.Duration
GetTSOUpdatePhysicalInterval() time.Duration
GetTLSConfig() *grpcutil.TLSConfig
const (
defaultTSOSaveInterval = 3 * time.Second
// defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`.
defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond
maxTSOUpdatePhysicalInterval = 10 * time.Second
minTSOUpdatePhysicalInterval = 1 * time.Millisecond
)

// Config is the configuration for the TSO.
type Config struct {
flagSet *flag.FlagSet

configFile string
// EnableLocalTSO is used to enable the Local TSO Allocator feature,
// which allows the PD server to generate Local TSO for certain DC-level transactions.
// To make this feature meaningful, user has to set the "zone" label for the PD server
// to indicate which DC this PD belongs to.
EnableLocalTSO bool `toml:"enable-local-tso" json:"enable-local-tso"`

// TSOSaveInterval is the interval to save timestamp.
TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"`

// The interval to update physical part of timestamp. Usually, this config should not be set.
// At most 1<<18 (262144) TSOs can be generated in the interval. The smaller the value, the
// more TSOs provided, and at the same time consuming more CPU time.
// This config is only valid in 1ms to 10s. If it's configured too long or too short, it will
// be automatically clamped to the range.
TSOUpdatePhysicalInterval typeutil.Duration `toml:"tso-update-physical-interval" json:"tso-update-physical-interval"`
}

// NewConfig creates a new config.
func NewConfig() *Config {
cfg := &Config{}
cfg.flagSet = flag.NewFlagSet("pd", flag.ContinueOnError)
fs := cfg.flagSet

fs.StringVar(&cfg.configFile, "config", "", "config file")

return cfg
}

// Parse parses flag definitions from the argument list.
func (c *Config) Parse(arguments []string) error {
// Parse first to get config file.
err := c.flagSet.Parse(arguments)
if err != nil {
return errors.WithStack(err)
}

// Load config file if specified.
var meta *toml.MetaData
if c.configFile != "" {
meta, err = configutil.ConfigFromFile(c, c.configFile)
if err != nil {
return err
}
}

// Parse again to replace with command line options.
err = c.flagSet.Parse(arguments)
if err != nil {
return errors.WithStack(err)
}

if len(c.flagSet.Args()) != 0 {
return errors.Errorf("'%s' is an invalid flag", c.flagSet.Arg(0))
}

return c.adjust(meta)
}

func (c *Config) adjust(meta *toml.MetaData) error {
configMetaData := configutil.NewConfigMetadata(meta)
if err := configMetaData.CheckUndecoded(); err != nil {
return err
}

if !meta.IsDefined("tso-save-interval") {
c.TSOSaveInterval.Duration = defaultTSOSaveInterval
}
if !meta.IsDefined("tso-update-physical-interval") {
c.TSOUpdatePhysicalInterval.Duration = defaultTSOUpdatePhysicalInterval
}
if c.TSOUpdatePhysicalInterval.Duration > maxTSOUpdatePhysicalInterval {
c.TSOUpdatePhysicalInterval.Duration = maxTSOUpdatePhysicalInterval
} else if c.TSOUpdatePhysicalInterval.Duration < minTSOUpdatePhysicalInterval {
c.TSOUpdatePhysicalInterval.Duration = minTSOUpdatePhysicalInterval
}
return nil
}
42 changes: 42 additions & 0 deletions pkg/tso/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package tso

import (
"testing"
"time"

"github.com/BurntSushi/toml"
"github.com/stretchr/testify/require"
)

func TestAdjust(t *testing.T) {
re := require.New(t)

// TSO config testings
cfgData := `
tso-update-physical-interval = "3m"
tso-save-interval = "1m"
enable-local-tso = true
`
cfg := NewConfig()
meta, err := toml.Decode(cfgData, &cfg)
re.NoError(err)
err = cfg.adjust(&meta)
re.NoError(err)

re.Equal(maxTSOUpdatePhysicalInterval, cfg.TSOUpdatePhysicalInterval.Duration)
re.Equal(1*time.Minute, cfg.TSOSaveInterval.Duration)
re.True(cfg.EnableLocalTSO)

cfgData = `
tso-update-physical-interval = "1ns"
`
cfg = NewConfig()
meta, err = toml.Decode(cfgData, &cfg)
re.NoError(err)
err = cfg.adjust(&meta)
re.NoError(err)

re.Equal(minTSOUpdatePhysicalInterval, cfg.TSOUpdatePhysicalInterval.Duration)
re.Equal(defaultTSOSaveInterval, cfg.TSOSaveInterval.Duration)
re.False(cfg.EnableLocalTSO)
}
60 changes: 60 additions & 0 deletions pkg/utils/configutil/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package configutil

import (
"errors"

"github.com/BurntSushi/toml"
)

// ConfigMetaData is the utility to test if a configuration is defined.
type ConfigMetaData struct {
meta *toml.MetaData
path []string
}

// NewConfigMetadata creates a new ConfigMetaData.
func NewConfigMetadata(meta *toml.MetaData) *ConfigMetaData {
return &ConfigMetaData{meta: meta}
}

// IsDefined checks if the key is defined in the configuration.
func (m *ConfigMetaData) IsDefined(key string) bool {
if m.meta == nil {
return false
}
keys := append([]string(nil), m.path...)
keys = append(keys, key)
return m.meta.IsDefined(keys...)
}

// Child creates a new ConfigMetaData with the path appended.
func (m *ConfigMetaData) Child(path ...string) *ConfigMetaData {
newPath := append([]string(nil), m.path...)
newPath = append(newPath, path...)
return &ConfigMetaData{
meta: m.meta,
path: newPath,
}
}

// CheckUndecoded checks if there are any undefined items in the configuration.
func (m *ConfigMetaData) CheckUndecoded() error {
if m.meta == nil {
return nil
}
undecoded := m.meta.Undecoded()
if len(undecoded) == 0 {
return nil
}
errInfo := "Config contains undefined item: "
for _, key := range undecoded {
errInfo += key.String() + ", "
}
return errors.New(errInfo[:len(errInfo)-2])
}

// ConfigFromFile loads config from file.
func ConfigFromFile(config any, path string) (*toml.MetaData, error) {
meta, err := toml.DecodeFile(path, config)
return &meta, err
}
Loading