Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_manager: implement the independent primary election #6086

Merged
merged 7 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pkg/mcs/resource_manager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ import (

const (
defaultName = "Resource Manager"
defaultBackendEndpoints = "127.0.0.1:2379"
defaultBackendEndpoints = "http://127.0.0.1:3379"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe conflict with tso's defaultListenAddr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think 2379 is better.

defaultListenAddr = "127.0.0.1:3380"
defaultEnableGRPCGateway = true

defaultLogFormat = "text"
defaultDisableErrorVerbose = true

defaultLeaderLease = int64(3)

defaultReadBaseCost = 0.25
defaultWriteBaseCost = 1
// 1 RU = 64 KiB read bytes
Expand Down Expand Up @@ -66,6 +68,12 @@ type Config struct {

Security configutil.SecurityConfig `toml:"security" json:"security"`

// LeaderLease defines the time within which a Resource Manager primary/leader must
// update its TTL in etcd, otherwise etcd will expire the leader key and other servers
// can campaign the primary/leader again. Etcd only supports seconds TTL, so here is
// second too.
LeaderLease int64 `toml:"lease" json:"lease"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig
Expand Down Expand Up @@ -180,6 +188,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
c.Log.Format = defaultLogFormat
}

configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease)

c.RequestUnit.Adjust()

return nil
Expand Down
10 changes: 10 additions & 0 deletions pkg/mcs/resource_manager/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

const (
namespace = "resource_manager"
serverSubsystem = "server"
ruSubsystem = "resource_unit"
resourceSubsystem = "resource"
resourceGroupNameLabel = "name"
Expand All @@ -29,6 +30,14 @@ const (
)

var (
// Meta & Server info.
serverInfo = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: serverSubsystem,
Name: "info",
Help: "Indicate the resource manager server info, and the value is the start timestamp (s).",
}, []string{"version", "hash"})
// RU cost metrics.
readRequestUnitCost = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -97,6 +106,7 @@ var (
)

func init() {
prometheus.MustRegister(serverInfo)
prometheus.MustRegister(readRequestUnitCost)
prometheus.MustRegister(writeRequestUnitCost)
prometheus.MustRegister(sqlLayerRequestUnitCost)
Expand Down
166 changes: 133 additions & 33 deletions pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
Expand All @@ -35,8 +36,10 @@ import (
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.etcd.io/etcd/clientv3"
Expand All @@ -46,30 +49,36 @@ import (
)

const (
tcp = "tcp"
tcpNetworkStr = "tcp"
// resourceManagerKeyspaceGroupPrimaryElectionPrefix defines the key prefix for keyspace group primary election.
// The entire key is in the format of "/pd/<cluster-id>/microservice/resource_manager/keyspace-group-XXXXX/primary"
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
// in which XXXXX is 5 digits integer with leading zeros. For now we use 0 as the default cluster id.
resourceManagerKeyspaceGroupPrimaryElectionPrefix = "/pd/0/microservice/resource_manager/keyspace-group-"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the resource manager have the concept of a keyspace group?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the assumption of the future, we may separate the resource manager service into several to serve different keyspace groups.

JmPotato marked this conversation as resolved.
Show resolved Hide resolved
// defaultGRPCGracefulStopTimeout is the default timeout to wait for grpc server to gracefully stop
defaultGRPCGracefulStopTimeout = 5 * time.Second
// defaultHTTPGracefulShutdownTimeout is the default timeout to wait for http server to gracefully shutdown
defaultHTTPGracefulShutdownTimeout = 5 * time.Second
defaultLeaseInSeconds = 3
leaderTickInterval = 50 * time.Millisecond
)

// Server is the resource manager server, and it implements bs.Server.
// nolint
type Server struct {
// Server state. 0 is not serving, 1 is serving.
isServing int64

ctx context.Context
serverLoopWg sync.WaitGroup
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel func()
serverLoopWg sync.WaitGroup

cfg *Config
name string
backendURLs []url.URL
listenURL *url.URL
cfg *Config
name string

etcdClient *clientv3.Client
httpClient *http.Client
// for the primary election of resource manager
participant *member.Participant
etcdClient *clientv3.Client
httpClient *http.Client

muxListener net.Listener
service *Service
Expand All @@ -83,7 +92,7 @@ type Server struct {
serviceRegister *discovery.ServiceRegister
}

// Name returns the unique etcd Name for this server in etcd cluster.
// Name returns the unique etcd name for this server in etcd cluster.
func (s *Server) Name() string {
return s.name
}
Expand All @@ -93,12 +102,101 @@ func (s *Server) Context() context.Context {
return s.ctx
}

// Run runs the pd server.
func (s *Server) Run() error {
if err := s.initClient(); err != nil {
// Run runs the Resource Manager server.
func (s *Server) Run() (err error) {
if err = s.initClient(); err != nil {
return err
}
return s.startServer()
if err = s.startServer(); err != nil {
return err
}

s.startServerLoop()

return nil
}

func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(1)
go s.primaryElectionLoop()
}

func (s *Server) primaryElectionLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

for {
if s.IsClosed() {
log.Info("server is closed, exit resource manager primary election loop")
return
}

primary, rev, checkAgain := s.participant.CheckLeader()
if checkAgain {
continue
}
if primary != nil {
log.Info("start to watch the primary/leader", zap.Stringer("resource-manager-primary", primary))
// WatchLeader will keep looping and never return unless the primary/leader has changed.
s.participant.WatchLeader(s.serverLoopCtx, primary, rev)
log.Info("the resource manager primary/leader has changed, try to re-campaign a primary/leader")
}

s.campaignLeader()
}
}

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-resource-manager-primary-name", s.participant.Member().Name))
if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign resource manager primary/leader meets error due to txn conflict, another resource manager server may campaign successfully",
zap.String("campaign-resource-manager-primary-name", s.participant.Member().Name))
} else {
log.Error("campaign resource manager primary/leader meets error due to etcd error",
zap.String("campaign-resource-manager-primary-name", s.participant.Member().Name),
errs.ZapError(err))
}
return
}

// Start keepalive the leadership and enable Resource Manager service.
ctx, cancel := context.WithCancel(s.serverLoopCtx)
var resetLeaderOnce sync.Once
defer resetLeaderOnce.Do(func() {
cancel()
s.participant.ResetLeader()
})

// maintain the the leadership, after this, Resource Manager could be ready to provide service.
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
s.participant.KeepLeader(ctx)
log.Info("campaign resource manager primary ok", zap.String("campaign-resource-manager-primary-name", s.participant.Member().Name))

log.Info("triggering the primary callback functions")
for _, cb := range s.primaryCallbacks {
cb(ctx)
}

s.participant.EnableLeader()
log.Info("resource manager primary is ready to serve", zap.String("resource-manager-primary-name", s.participant.Member().Name))

leaderTicker := time.NewTicker(leaderTickInterval)
defer leaderTicker.Stop()

for {
select {
case <-leaderTicker.C:
if !s.participant.IsLeader() {
log.Info("no longer a primary/leader because lease has expired, the resource manager primary/leader will step down")
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed")
return
}
}
}

// Close closes the server.
Expand All @@ -111,6 +209,7 @@ func (s *Server) Close() {
log.Info("closing resource manager server ...")
s.serviceRegister.Deregister()
s.muxListener.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()

if s.etcdClient != nil {
Expand Down Expand Up @@ -148,8 +247,7 @@ func (s *Server) AddStartCallback(callbacks ...func()) {

// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) IsServing() bool {
// TODO: implement this function with primary.
return atomic.LoadInt64(&s.isServing) == 1
return s.participant.IsLeader()
}

// IsClosed checks if the server loop is closed
Expand All @@ -171,8 +269,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.backendURLs = []url.URL(u)
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendURLs)
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u))
return err
}

Expand Down Expand Up @@ -253,29 +350,36 @@ func (s *Server) startGRPCAndHTTPServers(l net.Listener) {

// GetPrimary returns the primary member.
func (s *Server) GetPrimary() bs.MemberProvider {
// TODO: implement this function with primary.
return nil
return s.participant.GetLeader()
}

func (s *Server) startServer() error {
manager := NewManager[*Server](s)
// The independent Resource Manager service still reuses PD version info since PD and Resource Manager are just
// different service modes provided by the same pd-server binary
serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix()))

uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
s.participant = member.NewParticipant(s.etcdClient, uniqueID)
s.participant.InitInfo(uniqueName, resourceManagerKeyspaceGroupPrimaryElectionPrefix+fmt.Sprintf("%05d", 0), "primary", "keyspace group primary election")
s.participant.SetMemberDeployPath(s.participant.ID())
s.participant.SetMemberBinaryVersion(s.participant.ID(), versioninfo.PDReleaseVersion)
s.participant.SetMemberGitHash(s.participant.ID(), versioninfo.PDGitHash)

s.service = &Service{
ctx: s.ctx,
manager: manager,
manager: NewManager[*Server](s),
}

tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
}
s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}
if tlsConfig != nil {
s.muxListener, err = tls.Listen(tcp, s.listenURL.Host, tlsConfig)
s.muxListener, err = tls.Listen(tcpNetworkStr, s.cfg.ListenAddr, tlsConfig)
} else {
s.muxListener, err = net.Listen(tcp, s.listenURL.Host)
s.muxListener, err = net.Listen(tcpNetworkStr, s.cfg.ListenAddr)
}
if err != nil {
return err
Expand All @@ -289,10 +393,6 @@ func (s *Server) startServer() error {
for _, cb := range s.startCallbacks {
cb()
}
// TODO: resolve callback for the primary
for _, cb := range s.primaryCallbacks {
cb(s.ctx)
}

// Server has started.
atomic.StoreInt64(&s.isServing, 1)
Expand Down
60 changes: 60 additions & 0 deletions pkg/mcs/resource_manager/server/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"os"

"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/logutil"
)

// CleanupFunc closes test resource manager server(s) and deletes any files left behind.
type CleanupFunc func()

// NewTestServer creates a resource manager server for testing.
func NewTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*Server, CleanupFunc, error) {
// New zap logger
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
re.NoError(err)
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries
defer log.Sync()

s := NewServer(ctx, cfg)
if err = s.Run(); err != nil {
return nil, nil, err
}

cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

// NewTestDefaultConfig creates a new default config for testing.
func NewTestDefaultConfig() (*Config, error) {
cmd := &cobra.Command{
Use: "resource_manager",
Short: "Run the resource manager service",
}
cfg := NewConfig()
flagSet := cmd.Flags()
return cfg, cfg.Parse(flagSet)
}
Loading