Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_manager: implement the independent primary election #6086

Merged
merged 7 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/mcs/resource_manager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ import (

const (
defaultName = "Resource Manager"
defaultBackendEndpoints = "127.0.0.1:2379"
defaultBackendEndpoints = "http://127.0.0.1:3379"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe conflict with tso's defaultListenAddr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think 2379 is better.

defaultListenAddr = "127.0.0.1:3380"
defaultEnableGRPCGateway = true

defaultLogFormat = "text"
defaultDisableErrorVerbose = true

defaultLeaderLease = int64(3)

defaultReadBaseCost = 0.25
defaultWriteBaseCost = 1
// 1 RU = 64 KiB read bytes
Expand Down Expand Up @@ -186,6 +188,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
c.Log.Format = defaultLogFormat
}

configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease)

c.RequestUnit.Adjust()

return nil
Expand Down
10 changes: 10 additions & 0 deletions pkg/mcs/resource_manager/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

const (
namespace = "resource_manager"
serverSubsystem = "server"
ruSubsystem = "resource_unit"
resourceSubsystem = "resource"
resourceGroupNameLabel = "name"
Expand All @@ -29,6 +30,14 @@ const (
)

var (
// Meta & Server info.
serverInfo = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: serverSubsystem,
Name: "info",
Help: "Indicate the resource manager server info, and the value is the start timestamp (s).",
}, []string{"version", "hash"})
// RU cost metrics.
readRequestUnitCost = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -97,6 +106,7 @@ var (
)

func init() {
prometheus.MustRegister(serverInfo)
prometheus.MustRegister(readRequestUnitCost)
prometheus.MustRegister(writeRequestUnitCost)
prometheus.MustRegister(sqlLayerRequestUnitCost)
Expand Down
47 changes: 29 additions & 18 deletions pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.etcd.io/etcd/clientv3"
Expand All @@ -47,13 +49,17 @@ import (
)

const (
leaderTickInterval = 50 * time.Millisecond
tcpNetworkStr = "tcp"
tcpNetworkStr = "tcp"
// resourceManagerKeyspaceGroupPrimaryElectionPrefix defines the key prefix for keyspace group primary election.
// The entire key is in the format of "/pd/<cluster-id>/microservice/resource_manager/keyspace-group-XXXXX/primary"
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
// in which XXXXX is 5 digits integer with leading zeros. For now we use 0 as the default cluster id.
resourceManagerKeyspaceGroupPrimaryElectionPrefix = "/pd/0/microservice/resource_manager/keyspace-group-"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the resource manager have the concept of a keyspace group?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the assumption of the future, we may separate the resource manager service into several to serve different keyspace groups.

JmPotato marked this conversation as resolved.
Show resolved Hide resolved
// defaultGRPCGracefulStopTimeout is the default timeout to wait for grpc server to gracefully stop
defaultGRPCGracefulStopTimeout = 5 * time.Second
// defaultHTTPGracefulShutdownTimeout is the default timeout to wait for http server to gracefully shutdown
defaultHTTPGracefulShutdownTimeout = 5 * time.Second
defaultLeaseInSeconds = 3
leaderTickInterval = 50 * time.Millisecond
)

// Server is the resource manager server, and it implements bs.Server.
Expand All @@ -66,10 +72,8 @@ type Server struct {
serverLoopCancel func()
serverLoopWg sync.WaitGroup

cfg *Config
name string
backendURLs []url.URL
listenURL *url.URL
cfg *Config
name string

// for the primary election of resource manager
participant *member.Participant
Expand Down Expand Up @@ -205,6 +209,7 @@ func (s *Server) Close() {
log.Info("closing resource manager server ...")
s.serviceRegister.Deregister()
s.muxListener.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()

if s.etcdClient != nil {
Expand Down Expand Up @@ -242,8 +247,7 @@ func (s *Server) AddStartCallback(callbacks ...func()) {

// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) IsServing() bool {
// TODO: implement this function with primary.
return atomic.LoadInt64(&s.isServing) == 1
return s.participant.IsLeader()
}

// IsClosed checks if the server loop is closed
Expand All @@ -265,8 +269,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.backendURLs = []url.URL(u)
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendURLs)
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u))
return err
}

Expand Down Expand Up @@ -347,11 +350,23 @@ func (s *Server) startGRPCAndHTTPServers(l net.Listener) {

// GetPrimary returns the primary member.
func (s *Server) GetPrimary() bs.MemberProvider {
// TODO: implement this function with primary.
return nil
return s.participant.GetLeader()
}

func (s *Server) startServer() error {
// The independent Resource Manager service still reuses PD version info since PD and Resource Manager are just
// different service modes provided by the same pd-server binary
serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix()))

uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
s.participant = member.NewParticipant(s.etcdClient, uniqueID)
s.participant.InitInfo(uniqueName, resourceManagerKeyspaceGroupPrimaryElectionPrefix+fmt.Sprintf("%05d", 0), "primary", "keyspace group primary election")
s.participant.SetMemberDeployPath(s.participant.ID())
s.participant.SetMemberBinaryVersion(s.participant.ID(), versioninfo.PDReleaseVersion)
s.participant.SetMemberGitHash(s.participant.ID(), versioninfo.PDGitHash)

s.service = &Service{
ctx: s.ctx,
manager: NewManager[*Server](s),
Expand All @@ -361,14 +376,10 @@ func (s *Server) startServer() error {
if err != nil {
return err
}
s.listenURL, err = url.Parse(s.cfg.ListenAddr)
if err != nil {
return err
}
if tlsConfig != nil {
s.muxListener, err = tls.Listen(tcpNetworkStr, s.listenURL.Host, tlsConfig)
s.muxListener, err = tls.Listen(tcpNetworkStr, s.cfg.ListenAddr, tlsConfig)
} else {
s.muxListener, err = net.Listen(tcpNetworkStr, s.listenURL.Host)
s.muxListener, err = net.Listen(tcpNetworkStr, s.cfg.ListenAddr)
}
if err != nil {
return err
Expand Down
60 changes: 60 additions & 0 deletions pkg/mcs/resource_manager/server/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"os"

"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/logutil"
)

// CleanupFunc closes test resource manager server(s) and deletes any files left behind.
type CleanupFunc func()

// NewTestServer creates a resource manager server for testing.
func NewTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*Server, CleanupFunc, error) {
// New zap logger
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
re.NoError(err)
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries
defer log.Sync()

s := NewServer(ctx, cfg)
if err = s.Run(); err != nil {
return nil, nil, err
}

cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

// NewTestDefaultConfig creates a new default config for testing.
func NewTestDefaultConfig() (*Config, error) {
cmd := &cobra.Command{
Use: "resource_manager",
Short: "Run the resource manager service",
}
cfg := NewConfig()
flagSet := cmd.Flags()
return cfg, cfg.Parse(flagSet)
}
10 changes: 4 additions & 6 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ package server

import (
"context"
"crypto/sha256"
"crypto/tls"
"encoding/binary"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -50,6 +48,7 @@ import (
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/versioninfo"
Expand Down Expand Up @@ -311,7 +310,7 @@ func (s *Server) AddStartCallback(callbacks ...func()) {
s.startCallbacks = append(s.startCallbacks, callbacks...)
}

// IsServing implments basicserver. It returns whether the server is the leader
// IsServing implements basicserver. It returns whether the server is the leader
// if there is embedded etcd, or the primary otherwise.
func (s *Server) IsServing() bool {
return s.participant.IsLeader()
Expand Down Expand Up @@ -554,8 +553,7 @@ func (s *Server) startServer() (err error) {
// TODO: Figure out how we should generated the unique id and name passed to Participant.
// For now, set the name to be listen address and generate the unique id from the name with sha256.
uniqueName := s.cfg.ListenAddr
hash := sha256.Sum256([]byte(uniqueName))
uniqueID := binary.LittleEndian.Uint64(hash[:8])
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))

s.participant = member.NewParticipant(s.etcdClient, uniqueID)
Expand Down Expand Up @@ -601,7 +599,7 @@ func (s *Server) startServer() (err error) {
return nil
}

// CreateServer creats the Server
// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *Config) *Server {
rand.New(rand.NewSource(time.Now().UnixNano()))
svr := &Server{
Expand Down
27 changes: 27 additions & 0 deletions pkg/utils/memberutil/member.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memberutil

import (
"crypto/sha256"
"encoding/binary"
)

// GenerateUniqueID generates a unique ID based on the given seed.
// This is used to generate a unique ID for a member.
func GenerateUniqueID(seed string) uint64 {
hash := sha256.Sum256([]byte(seed))
return binary.LittleEndian.Uint64(hash[:8])
}
36 changes: 18 additions & 18 deletions tests/mcs/resource_manager/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,21 @@ func TestResourceManagerServer(t *testing.T) {
leaderName := cluster.WaitLeader()
leader := cluster.GetServer(leaderName)

cfg := rm.NewConfig()
cfg, err := rm.NewTestDefaultConfig()
re.NoError(err)
cfg.BackendEndpoints = leader.GetAddr()
cfg.ListenAddr = tempurl.Alloc()
svr := rm.NewServer(ctx, cfg)
go svr.Run()
listenAddr := tempurl.Alloc()
cfg.ListenAddr = strings.TrimPrefix(listenAddr, "http://")

s, cleanup, err := rm.NewTestServer(ctx, re, cfg)
re.NoError(err)
defer cleanup()
testutil.Eventually(re, func() bool {
return svr.IsServing()
return s.IsServing()
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))
defer svr.Close()

// Test registered GRPC Service
cc, err := grpc.DialContext(ctx, strings.TrimPrefix(cfg.ListenAddr, "http://"), grpc.WithInsecure())
cc, err := grpc.DialContext(ctx, cfg.ListenAddr, grpc.WithInsecure())
re.NoError(err)
defer cc.Close()
c := rmpb.NewResourceManagerClient(cc)
Expand All @@ -69,7 +72,7 @@ func TestResourceManagerServer(t *testing.T) {
re.ErrorContains(err, "resource group not found")

// Test registered REST HTTP Handler
url := cfg.ListenAddr + "/resource-manager/api/v1/config"
url := listenAddr + "/resource-manager/api/v1/config"
{
resp, err := http.Get(url + "/groups")
re.NoError(err)
Expand Down Expand Up @@ -117,23 +120,20 @@ func TestResourceManagerRegister(t *testing.T) {
leaderName := cluster.WaitLeader()
leader := cluster.GetServer(leaderName)

cfg := rm.NewConfig()
cfg, err := rm.NewTestDefaultConfig()
re.NoError(err)
cfg.BackendEndpoints = leader.GetAddr()
cfg.ListenAddr = tempurl.Alloc()
cfg.ListenAddr = strings.TrimPrefix(tempurl.Alloc(), "http://")

svr := rm.NewServer(ctx, cfg)
go svr.Run()
s, cleanup, err := rm.NewTestServer(ctx, re, cfg)
re.NoError(err)
defer cleanup()
testutil.Eventually(re, func() bool {
return svr.IsServing()
return s.IsServing()
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

client := leader.GetEtcdClient()
endpoints, err := discovery.Discover(client, "resource_manager")
re.NoError(err)
re.Equal(cfg.ListenAddr, endpoints[0])

svr.Close()
endpoints, err = discovery.Discover(client, "resource_manager")
re.NoError(err)
re.Empty(endpoints)
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
}