Skip to content

Commit

Permalink
refactor tso config
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Feb 2, 2023
1 parent c5203c4 commit 0243346
Show file tree
Hide file tree
Showing 17 changed files with 128 additions and 51 deletions.
7 changes: 3 additions & 4 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/server/config"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
Expand Down Expand Up @@ -250,12 +249,12 @@ func (m *Member) isSameLeader(leader *pdpb.Member) bool {
}

// MemberInfo initializes the member info.
func (m *Member) MemberInfo(cfg *config.Config, name string, rootPath string) {
func (m *Member) MemberInfo(advertiseClientUrls, advertisePeerUrls, name string, rootPath string) {
leader := &pdpb.Member{
Name: name,
MemberId: m.ID(),
ClientUrls: strings.Split(cfg.AdvertiseClientUrls, ","),
PeerUrls: strings.Split(cfg.AdvertisePeerUrls, ","),
ClientUrls: strings.Split(advertiseClientUrls, ","),
PeerUrls: strings.Split(advertisePeerUrls, ","),
}

data, err := leader.Marshal()
Expand Down
11 changes: 6 additions & 5 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,18 @@ type AllocatorManager struct {
func NewAllocatorManager(
m *member.Member,
rootPath string,
cfg config,
cfg Config,
tlsConfig *grpcutil.TLSConfig,
maxResetTSGap func() time.Duration,
) *AllocatorManager {
allocatorManager := &AllocatorManager{
enableLocalTSO: cfg.IsLocalTSOEnabled(),
enableLocalTSO: cfg.EnableLocalTSO,
member: m,
rootPath: rootPath,
saveInterval: cfg.GetTSOSaveInterval(),
updatePhysicalInterval: cfg.GetTSOUpdatePhysicalInterval(),
saveInterval: cfg.SaveInterval.Duration,
updatePhysicalInterval: cfg.UpdatePhysicalInterval.Duration,
maxResetTSGap: maxResetTSGap,
securityConfig: cfg.GetTLSConfig(),
securityConfig: tlsConfig,
}
allocatorManager.mu.allocatorGroups = make(map[string]*allocatorGroup)
allocatorManager.mu.clusterDCLocations = make(map[string]*DCLocationInfo)
Expand Down
26 changes: 18 additions & 8 deletions pkg/tso/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,24 @@
package tso

import (
"time"

"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/typeutil"
)

type config interface {
IsLocalTSOEnabled() bool
GetTSOSaveInterval() time.Duration
GetTSOUpdatePhysicalInterval() time.Duration
GetTLSConfig() *grpcutil.TLSConfig
// Config is the configuration for the TSO.
type Config struct {
// EnableLocalTSO is used to enable the Local TSO Allocator feature,
// which allows the PD server to generate Local TSO for certain DC-level transactions.
// To make this feature meaningful, user has to set the "zone" label for the PD server
// to indicate which DC this PD belongs to.
EnableLocalTSO bool `toml:"enable-local-tso" json:"enable-local-tso"`

// SaveInterval is the interval to save timestamp.
SaveInterval typeutil.Duration `toml:"save-interval" json:"save-interval"`

// The interval to update physical part of timestamp. Usually, this config should not be set.
// At most 1<<18 (262144) TSOs can be generated in the interval. The smaller the value, the
// more TSOs provided, and at the same time consuming more CPU time.
// This config is only valid in 1ms to 10s. If it's configured too long or too short, it will
// be automatically clamped to the range.
UpdatePhysicalInterval typeutil.Duration `toml:"update-physical-interval" json:"update-physical-interval"`
}
2 changes: 1 addition & 1 deletion pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (t *timestampOracle) getTS(leadership *election.Leadership, count uint32, s
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory has been reset")
}
if resp.GetLogical() >= maxLogical {
log.Warn("logical part outside of max logical interval, please check ntp time, or adjust config item `tso-update-physical-interval`",
log.Warn("logical part outside of max logical interval, please check ntp time, or adjust config item `tso.update-physical-interval`",
zap.Reflect("response", resp),
zap.Int("retry-count", i), errs.ZapError(errs.ErrLogicOverflow))
tsoCounter.WithLabelValues("logical_overflow", t.dcLocation).Inc()
Expand Down
2 changes: 1 addition & 1 deletion server/api/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestMemberTestSuite(t *testing.T) {

func (suite *memberTestSuite) SetupSuite() {
suite.cfgs, suite.servers, suite.clean = mustNewCluster(suite.Require(), 3, func(cfg *config.Config) {
cfg.EnableLocalTSO = true
cfg.TSOConfig.EnableLocalTSO = true
cfg.Labels = map[string]string{
config.ZoneLabel: "dc-1",
}
Expand Down
2 changes: 1 addition & 1 deletion server/api/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestTSOTestSuite(t *testing.T) {
func (suite *tsoTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) {
cfg.EnableLocalTSO = true
cfg.TSOConfig.EnableLocalTSO = true
cfg.Labels[config.ZoneLabel] = "dc-1"
})
server.MustWaitLeader(re, []*server.Server{suite.svr})
Expand Down
49 changes: 42 additions & 7 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/encryption"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
Expand Down Expand Up @@ -111,6 +112,8 @@ type Config struct {

PDServerCfg PDServerConfig `toml:"pd-server" json:"pd-server"`

TSOConfig tso.Config `toml:"tso" json:"tso"`

ClusterVersion semver.Version `toml:"cluster-version" json:"cluster-version"`

// Labels indicates the labels set for **this** PD server. The labels describe some specific properties
Expand Down Expand Up @@ -409,6 +412,27 @@ func (c *Config) Parse(arguments []string) error {
c.Log.Level = c.LogLevelDeprecated
}
}
if c.EnableLocalTSO {
msg := fmt.Sprintf("enable-local-tso in %s is deprecated, use [tso.enable-local-tso] instead", c.configFile)
c.WarningMsgs = append(c.WarningMsgs, msg)
if !meta.IsDefined("tso", "enable-local-tso") {
c.TSOConfig.EnableLocalTSO = c.EnableLocalTSO
}
}
if c.TSOSaveInterval.Duration != defaultTSOSaveInterval {
msg := fmt.Sprintf("tso-save-interval in %s is deprecated, use [tso.save-interval] instead", c.configFile)
c.WarningMsgs = append(c.WarningMsgs, msg)
if !meta.IsDefined("tso", "save-interval") {
c.TSOConfig.SaveInterval = c.TSOSaveInterval
}
}
if c.TSOUpdatePhysicalInterval.Duration != defaultTSOUpdatePhysicalInterval {
msg := fmt.Sprintf("tso-update-physical-interval in %s is deprecated, use [tso.update-physical-interval] instead", c.configFile)
c.WarningMsgs = append(c.WarningMsgs, msg)
if !meta.IsDefined("tso", "update-physical-interval") {
c.TSOConfig.UpdatePhysicalInterval = c.TSOUpdatePhysicalInterval
}
}
if meta.IsDefined("schedule", "disable-raft-learner") {
msg := fmt.Sprintf("disable-raft-learner in %s is deprecated", c.configFile)
c.WarningMsgs = append(c.WarningMsgs, msg)
Expand Down Expand Up @@ -589,6 +613,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
}

c.adjustLog(configMetaData.Child("log"))
c.adjustTSOConfig(configMetaData.Child("tso"))
adjustDuration(&c.HeartbeatStreamBindInterval, defaultHeartbeatStreamRebindInterval)

adjustDuration(&c.LeaderPriorityCheckInterval, defaultLeaderPriorityCheckInterval)
Expand Down Expand Up @@ -619,6 +644,21 @@ func (c *Config) adjustLog(meta *configMetaData) {
}
}

func (c *Config) adjustTSOConfig(meta *configMetaData) {
if !meta.IsDefined("save-interval") {
c.TSOConfig.SaveInterval.Duration = defaultTSOSaveInterval
}

if !meta.IsDefined("update-physical-interval") {
c.TSOConfig.UpdatePhysicalInterval.Duration = defaultTSOUpdatePhysicalInterval
}
if c.TSOConfig.UpdatePhysicalInterval.Duration > maxTSOUpdatePhysicalInterval {
c.TSOConfig.UpdatePhysicalInterval.Duration = maxTSOUpdatePhysicalInterval
} else if c.TSOConfig.UpdatePhysicalInterval.Duration < minTSOUpdatePhysicalInterval {
c.TSOConfig.UpdatePhysicalInterval.Duration = minTSOUpdatePhysicalInterval
}
}

// Clone returns a cloned configuration.
func (c *Config) Clone() *Config {
cfg := *c
Expand Down Expand Up @@ -1306,19 +1346,14 @@ func (c *Config) GetConfigFile() string {
return c.configFile
}

// IsLocalTSOEnabled returns if the local TSO is enabled.
func (c *Config) IsLocalTSOEnabled() bool {
return c.EnableLocalTSO
}

// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
func (c *Config) GetTSOUpdatePhysicalInterval() time.Duration {
return c.TSOUpdatePhysicalInterval.Duration
return c.TSOConfig.UpdatePhysicalInterval.Duration
}

// GetTSOSaveInterval returns TSO save interval.
func (c *Config) GetTSOSaveInterval() time.Duration {
return c.TSOSaveInterval.Duration
return c.TSOConfig.SaveInterval.Duration
}

// GetTLSConfig returns the TLS config.
Expand Down
31 changes: 31 additions & 0 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,37 @@ tso-update-physical-interval = "15s"
re.NoError(err)

re.Equal(maxTSOUpdatePhysicalInterval, cfg.TSOUpdatePhysicalInterval.Duration)

// TSO config testings
cfgData = `
[tso]
update-physical-interval = "3m"
save-interval = "1m"
enable-local-tso = true
`
cfg = NewConfig()
meta, err = toml.Decode(cfgData, &cfg)
re.NoError(err)
err = cfg.Adjust(&meta, false)
re.NoError(err)

re.Equal(maxTSOUpdatePhysicalInterval, cfg.TSOConfig.UpdatePhysicalInterval.Duration)
re.Equal(1*time.Minute, cfg.TSOConfig.SaveInterval.Duration)
re.True(cfg.TSOConfig.EnableLocalTSO)

cfgData = `
[tso]
update-physical-interval = "1ns"
`
cfg = NewConfig()
meta, err = toml.Decode(cfgData, &cfg)
re.NoError(err)
err = cfg.Adjust(&meta, false)
re.NoError(err)

re.Equal(minTSOUpdatePhysicalInterval, cfg.TSOConfig.UpdatePhysicalInterval.Duration)
re.Equal(defaultTSOSaveInterval, cfg.TSOConfig.SaveInterval.Duration)
re.False(cfg.TSOConfig.EnableLocalTSO)
}

func TestMigrateFlags(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (s *Server) startServer(ctx context.Context) error {
serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix()))

s.rootPath = path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
s.member.MemberInfo(s.cfg, s.Name(), s.rootPath)
s.member.MemberInfo(s.cfg.AdvertiseClientUrls, s.cfg.AdvertisePeerUrls, s.Name(), s.rootPath)
s.member.SetMemberDeployPath(s.member.ID())
s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion)
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
Expand All @@ -370,17 +370,17 @@ func (s *Server) startServer(ctx context.Context) error {
Member: s.member.MemberValue(),
})
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member, s.rootPath, s.cfg,
s.member, s.rootPath, s.cfg.TSOConfig, s.cfg.GetTLSConfig(),
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() })
// Set up the Global TSO Allocator here, it will be initialized once the PD campaigns leader successfully.
s.tsoAllocatorManager.SetUpAllocator(ctx, tso.GlobalDCLocation, s.member.GetLeadership())
// When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists.
if !s.cfg.EnableLocalTSO {
if !s.cfg.TSOConfig.EnableLocalTSO {
if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil {
return err
}
}
if zone, exist := s.cfg.Labels[config.ZoneLabel]; exist && zone != "" && s.cfg.EnableLocalTSO {
if zone, exist := s.cfg.Labels[config.ZoneLabel]; exist && zone != "" && s.cfg.TSOConfig.EnableLocalTSO {
if err = s.tsoAllocatorManager.SetLocalTSOConfig(zone); err != nil {
return err
}
Expand Down Expand Up @@ -825,7 +825,7 @@ func (s *Server) GetServiceMiddlewareConfig() *config.ServiceMiddlewareConfig {

// SetEnableLocalTSO sets enable-local-tso flag of Server. This function only for test.
func (s *Server) SetEnableLocalTSO(enableLocalTSO bool) {
s.cfg.EnableLocalTSO = enableLocalTSO
s.cfg.TSOConfig.EnableLocalTSO = enableLocalTSO
}

// GetConfig gets the config information.
Expand Down
1 change: 1 addition & 0 deletions server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func NewTestSingleConfig(c *assertutil.Checker) *config.Config {
TSOSaveInterval: typeutil.NewDuration(200 * time.Millisecond),
}

cfg.TSOConfig.SaveInterval = typeutil.NewDuration(200 * time.Millisecond)
cfg.AdvertiseClientUrls = cfg.ClientUrls
cfg.AdvertisePeerUrls = cfg.PeerUrls
cfg.DataDir, _ = os.MkdirTemp("/tmp", "test_pd")
Expand Down
6 changes: 3 additions & 3 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestTSOAllocatorLeader(t *testing.T) {
}
dcLocationNum := len(dcLocationConfig)
cluster, err := tests.NewTestCluster(ctx, dcLocationNum, func(conf *config.Config, serverName string) {
conf.EnableLocalTSO = true
conf.TSOConfig.EnableLocalTSO = true
conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName]
})
re.NoError(err)
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestGlobalAndLocalTSO(t *testing.T) {
}
dcLocationNum := len(dcLocationConfig)
cluster, err := tests.NewTestCluster(ctx, dcLocationNum, func(conf *config.Config, serverName string) {
conf.EnableLocalTSO = true
conf.TSOConfig.EnableLocalTSO = true
conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName]
})
re.NoError(err)
Expand All @@ -435,7 +435,7 @@ func TestGlobalAndLocalTSO(t *testing.T) {

// Join a new dc-location
pd4, err := cluster.Join(ctx, func(conf *config.Config, serverName string) {
conf.EnableLocalTSO = true
conf.TSOConfig.EnableLocalTSO = true
conf.Labels[config.ZoneLabel] = "dc-4"
})
re.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestMemberDelete(t *testing.T) {
}
dcLocationNum := len(dcLocationConfig)
cluster, err := tests.NewTestCluster(ctx, dcLocationNum, func(conf *config.Config, serverName string) {
conf.EnableLocalTSO = true
conf.TSOConfig.EnableLocalTSO = true
conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName]
})
defer cluster.Destroy()
Expand Down
6 changes: 3 additions & 3 deletions tests/server/tso/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestAllocatorLeader(t *testing.T) {
dcLocationNum := len(dcLocationConfig)
cluster, err := tests.NewTestCluster(ctx, dcLocationNum*2, func(conf *config.Config, serverName string) {
if zoneLabel, ok := dcLocationConfig[serverName]; ok {
conf.EnableLocalTSO = true
conf.TSOConfig.EnableLocalTSO = true
conf.Labels[config.ZoneLabel] = zoneLabel
}
})
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestPriorityAndDifferentLocalTSO(t *testing.T) {
}
dcLocationNum := len(dcLocationConfig)
cluster, err := tests.NewTestCluster(ctx, dcLocationNum, func(conf *config.Config, serverName string) {
conf.EnableLocalTSO = true
conf.TSOConfig.EnableLocalTSO = true
conf.Labels[config.ZoneLabel] = dcLocationConfig[serverName]
})
defer cluster.Destroy()
Expand All @@ -133,7 +133,7 @@ func TestPriorityAndDifferentLocalTSO(t *testing.T) {

// Join a new dc-location
pd4, err := cluster.Join(ctx, func(conf *config.Config, serverName string) {
conf.EnableLocalTSO = true
conf.TSOConfig.EnableLocalTSO = true
conf.Labels[config.ZoneLabel] = "dc-4"
})
re.NoError(err)
Expand Down
Loading

0 comments on commit 0243346

Please sign in to comment.