Skip to content

Commit

Permalink
client/sd: unify the service discovery callbacks within a struct (#9014)
Browse files Browse the repository at this point in the history
ref #8690

Unify the service discovery callbacks within a struct.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored Jan 22, 2025
1 parent b66703c commit ae6df14
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 84 deletions.
5 changes: 2 additions & 3 deletions client/clients/tso/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,8 @@ func NewClient(
},
}

eventSrc := svcDiscovery.(sd.TSOEventSource)
eventSrc.SetTSOLeaderURLUpdatedCallback(c.updateTSOLeaderURL)
c.svcDiscovery.AddServiceURLsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs)
c.svcDiscovery.ExecAndAddLeaderSwitchedCallback(c.updateTSOLeaderURL)
c.svcDiscovery.AddMembersChangedCallback(c.scheduleUpdateTSOConnectionCtxs)

return c
}
Expand Down
5 changes: 3 additions & 2 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
}
}

func (c *innerClient) scheduleUpdateTokenConnection() {
func (c *innerClient) scheduleUpdateTokenConnection(string) error {
select {
case c.updateTokenConnectionCh <- struct{}{}:
default:
}
return nil
}

func (c *innerClient) getServiceMode() pdpb.ServiceMode {
Expand Down Expand Up @@ -188,7 +189,7 @@ func (c *innerClient) setup() error {
}

// Register callbacks
c.serviceDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection)
c.serviceDiscovery.AddLeaderSwitchedCallback(c.scheduleUpdateTokenConnection)

// Create dispatchers
c.createTokenDispatcher()
Expand Down
104 changes: 104 additions & 0 deletions client/servicediscovery/callbacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2025 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package servicediscovery

import (
"sync"

"github.com/pingcap/kvproto/pkg/pdpb"
)

type leaderSwitchedCallbackFunc func(string) error

// serviceCallbacks contains all the callback functions for service discovery events
type serviceCallbacks struct {
sync.RWMutex
// serviceModeUpdateCb will be called when the service mode gets updated
serviceModeUpdateCb func(pdpb.ServiceMode)
// leaderSwitchedCbs will be called after the leader switched
leaderSwitchedCbs []leaderSwitchedCallbackFunc
// membersChangedCbs will be called after there is any membership change in the
// leader and followers
membersChangedCbs []func()
}

func newServiceCallbacks() *serviceCallbacks {
return &serviceCallbacks{
leaderSwitchedCbs: make([]leaderSwitchedCallbackFunc, 0),
membersChangedCbs: make([]func(), 0),
}
}

func (c *serviceCallbacks) setServiceModeUpdateCallback(cb func(pdpb.ServiceMode)) {
c.Lock()
defer c.Unlock()
c.serviceModeUpdateCb = cb
}

func (c *serviceCallbacks) addLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc) {
c.Lock()
defer c.Unlock()
c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, cb)
}

func (c *serviceCallbacks) addMembersChangedCallback(cb func()) {
c.Lock()
defer c.Unlock()
c.membersChangedCbs = append(c.membersChangedCbs, cb)
}

func (c *serviceCallbacks) onServiceModeUpdate(mode pdpb.ServiceMode) {
c.RLock()
cb := c.serviceModeUpdateCb
c.RUnlock()

if cb == nil {
return
}
cb(mode)
}

func (c *serviceCallbacks) onLeaderSwitched(leader string) error {
c.RLock()
cbs := make([]leaderSwitchedCallbackFunc, len(c.leaderSwitchedCbs))
copy(cbs, c.leaderSwitchedCbs)
c.RUnlock()

var err error
for _, cb := range cbs {
if cb == nil {
continue
}
err = cb(leader)
if err != nil {
return err
}
}
return nil
}

func (c *serviceCallbacks) onMembersChanged() {
c.RLock()
cbs := make([]func(), len(c.membersChangedCbs))
copy(cbs, c.membersChangedCbs)
c.RUnlock()

for _, cb := range cbs {
if cb == nil {
continue
}
cb()
}
}
11 changes: 7 additions & 4 deletions client/servicediscovery/mock_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ func (*mockServiceDiscovery) ScheduleCheckMemberChanged() {}
// CheckMemberChanged implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) CheckMemberChanged() error { return nil }

// AddServingURLSwitchedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) AddServingURLSwitchedCallback(...func()) {}
// ExecAndAddLeaderSwitchedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) ExecAndAddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {}

// AddServiceURLsSwitchedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) AddServiceURLsSwitchedCallback(...func()) {}
// AddLeaderSwitchedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) AddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {}

// AddMembersChangedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) AddMembersChangedCallback(func()) {}
92 changes: 34 additions & 58 deletions client/servicediscovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,16 @@ type ServiceDiscovery interface {
// CheckMemberChanged immediately check if there is any membership change among the leader/followers
// in a quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster.
CheckMemberChanged() error
// AddServingURLSwitchedCallback adds callbacks which will be called when the leader
// ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then.
ExecAndAddLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc)
// AddLeaderSwitchedCallback adds callbacks which will be called when the leader
// in a quorum-based cluster or the primary in a primary/secondary configured cluster
// is switched.
AddServingURLSwitchedCallback(callbacks ...func())
// AddServiceURLsSwitchedCallback adds callbacks which will be called when any leader/follower
AddLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc)
// AddMembersChangedCallback adds callbacks which will be called when any leader/follower
// in a quorum-based cluster or any primary/secondary in a primary/secondary configured cluster
// is changed.
AddServiceURLsSwitchedCallback(callbacks ...func())
AddMembersChangedCallback(cb func())
}

// ServiceClient is an interface that defines a set of operations for a raw PD gRPC client to specific PD server.
Expand Down Expand Up @@ -394,18 +396,8 @@ func (c *serviceBalancer) get() (ret ServiceClient) {

// UpdateKeyspaceIDFunc is the function type for updating the keyspace ID.
type UpdateKeyspaceIDFunc func() error
type tsoLeaderURLUpdatedFunc func(string) error

// TSOEventSource subscribes to events related to changes in the TSO leader/primary from the service discovery.
type TSOEventSource interface {
// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader/primary is updated.
SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc)
}

var (
_ ServiceDiscovery = (*serviceDiscovery)(nil)
_ TSOEventSource = (*serviceDiscovery)(nil)
)
var _ ServiceDiscovery = (*serviceDiscovery)(nil)

// serviceDiscovery is the service discovery client of PD/PD service which is quorum based
type serviceDiscovery struct {
Expand All @@ -426,15 +418,7 @@ type serviceDiscovery struct {
// url -> a gRPC connection
clientConns sync.Map // Store as map[string]*grpc.ClientConn

// serviceModeUpdateCb will be called when the service mode gets updated
serviceModeUpdateCb func(pdpb.ServiceMode)
// leaderSwitchedCbs will be called after the leader switched
leaderSwitchedCbs []func()
// membersChangedCbs will be called after there is any membership change in the
// leader and followers
membersChangedCbs []func()
// tsoLeaderUpdatedCb will be called when the TSO leader is updated.
tsoLeaderUpdatedCb tsoLeaderURLUpdatedFunc
callbacks *serviceCallbacks

checkMembershipCh chan struct{}

Expand Down Expand Up @@ -474,12 +458,13 @@ func NewServiceDiscovery(
cancel: cancel,
wg: wg,
apiCandidateNodes: [apiKindCount]*serviceBalancer{newServiceBalancer(emptyErrorFn), newServiceBalancer(regionAPIErrorFn)},
serviceModeUpdateCb: serviceModeUpdateCb,
callbacks: newServiceCallbacks(),
updateKeyspaceIDFunc: updateKeyspaceIDFunc,
keyspaceID: keyspaceID,
tlsCfg: tlsCfg,
option: option,
}
pdsd.callbacks.setServiceModeUpdateCallback(serviceModeUpdateCb)
urls = tlsutil.AddrsToURLs(urls, tlsCfg)
pdsd.urls.Store(urls)
return pdsd
Expand Down Expand Up @@ -570,7 +555,7 @@ func (c *serviceDiscovery) updateServiceModeLoop() {
failpoint.Return()
})
failpoint.Inject("usePDServiceMode", func() {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
c.callbacks.onServiceModeUpdate(pdpb.ServiceMode_PD_SVC_MODE)
failpoint.Return()
})

Expand Down Expand Up @@ -791,27 +776,29 @@ func (c *serviceDiscovery) CheckMemberChanged() error {
return c.updateMember()
}

// AddServingURLSwitchedCallback adds callbacks which will be called
// when the leader is switched.
func (c *serviceDiscovery) AddServingURLSwitchedCallback(callbacks ...func()) {
c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, callbacks...)
}

// AddServiceURLsSwitchedCallback adds callbacks which will be called when
// any leader/follower is changed.
func (c *serviceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func()) {
c.membersChangedCbs = append(c.membersChangedCbs, callbacks...)
}

// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated.
func (c *serviceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) {
// ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then.
func (c *serviceDiscovery) ExecAndAddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) {
url := c.getLeaderURL()
if len(url) > 0 {
if err := callback(url); err != nil {
log.Error("[tso] failed to call back when tso leader url update", zap.String("url", url), errs.ZapError(err))
log.Error("[pd] failed to run a callback with the current leader url",
zap.String("url", url), errs.ZapError(err))
}
}
c.tsoLeaderUpdatedCb = callback
c.AddLeaderSwitchedCallback(callback)
}

// AddLeaderSwitchedCallback adds callbacks which will be called when the leader
// in a quorum-based cluster or the primary in a primary/secondary configured cluster
// is switched.
func (c *serviceDiscovery) AddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) {
c.callbacks.addLeaderSwitchedCallback(callback)
}

// AddMembersChangedCallback adds callbacks which will be called when any primary/secondary
// in a primary/secondary configured cluster is changed.
func (c *serviceDiscovery) AddMembersChangedCallback(callback func()) {
c.callbacks.addMembersChangedCallback(callback)
}

// getLeaderURL returns the leader URL.
Expand Down Expand Up @@ -867,19 +854,15 @@ func (c *serviceDiscovery) checkServiceModeChanged() error {
// If the method is not supported, we set it to pd mode.
// TODO: it's a hack way to solve the compatibility issue.
// we need to remove this after all maintained version supports the method.
if c.serviceModeUpdateCb != nil {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
}
c.callbacks.onServiceModeUpdate(pdpb.ServiceMode_PD_SVC_MODE)
return nil
}
return err
}
if clusterInfo == nil || len(clusterInfo.ServiceModes) == 0 {
return errors.WithStack(errs.ErrNoServiceModeReturned)
}
if c.serviceModeUpdateCb != nil {
c.serviceModeUpdateCb(clusterInfo.ServiceModes[0])
}
c.callbacks.onServiceModeUpdate(clusterInfo.ServiceModes[0])
return nil
}

Expand Down Expand Up @@ -968,9 +951,7 @@ func (c *serviceDiscovery) updateURLs(members []*pdpb.Member) {
}
c.urls.Store(urls)
// Run callbacks to reflect the membership changes in the leader and followers.
for _, cb := range c.membersChangedCbs {
cb()
}
c.callbacks.onMembersChanged()
log.Info("[pd] update member urls", zap.Strings("old-urls", oldURLs), zap.Strings("new-urls", urls))
}

Expand All @@ -987,13 +968,8 @@ func (c *serviceDiscovery) switchLeader(url string) (bool, error) {
c.leader.Store(leaderClient)
}
// Run callbacks
if c.tsoLeaderUpdatedCb != nil {
if err := c.tsoLeaderUpdatedCb(url); err != nil {
return true, err
}
}
for _, cb := range c.leaderSwitchedCbs {
cb()
if err := c.callbacks.onLeaderSwitched(url); err != nil {
return true, err
}
log.Info("[pd] switch leader", zap.String("new-leader", url), zap.String("old-leader", oldLeader.GetURL()))
return true, err
Expand Down
3 changes: 2 additions & 1 deletion client/servicediscovery/service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func TestUpdateURLs(t *testing.T) {
}
return
}
cli := &serviceDiscovery{option: opt.NewOption()}
cli := &serviceDiscovery{callbacks: newServiceCallbacks(), option: opt.NewOption()}
cli.urls.Store([]string{})
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
Expand All @@ -422,6 +422,7 @@ func TestGRPCDialOption(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond)
defer cancel()
cli := &serviceDiscovery{
callbacks: newServiceCallbacks(),
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
Expand Down
Loading

0 comments on commit ae6df14

Please sign in to comment.