Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/sd: unify the service discovery callbacks within a struct #9014

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions client/clients/tso/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,8 @@ func NewClient(
},
}

eventSrc := svcDiscovery.(sd.TSOEventSource)
eventSrc.SetTSOLeaderURLUpdatedCallback(c.updateTSOLeaderURL)
c.svcDiscovery.AddServiceURLsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs)
c.svcDiscovery.ExecAndAddLeaderSwitchedCallback(c.updateTSOLeaderURL)
c.svcDiscovery.AddMembersChangedCallback(c.scheduleUpdateTSOConnectionCtxs)

return c
}
Expand Down
5 changes: 3 additions & 2 deletions client/inner_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) {
}
}

func (c *innerClient) scheduleUpdateTokenConnection() {
func (c *innerClient) scheduleUpdateTokenConnection(string) error {
select {
case c.updateTokenConnectionCh <- struct{}{}:
default:
}
return nil
}

func (c *innerClient) getServiceMode() pdpb.ServiceMode {
Expand Down Expand Up @@ -188,7 +189,7 @@ func (c *innerClient) setup() error {
}

// Register callbacks
c.serviceDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection)
c.serviceDiscovery.AddLeaderSwitchedCallback(c.scheduleUpdateTokenConnection)

// Create dispatchers
c.createTokenDispatcher()
Expand Down
104 changes: 104 additions & 0 deletions client/servicediscovery/callbacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2025 TiKV Project Authors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving this file to pkg?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its code is highly related with the service discovery, pkg is for those components which are more common.

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package servicediscovery

import (
"sync"

"github.com/pingcap/kvproto/pkg/pdpb"
)

type leaderSwitchedCallbackFunc func(string) error

// serviceCallbacks contains all the callback functions for service discovery events
type serviceCallbacks struct {
sync.RWMutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this lock necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock will be used in the next PR.

// serviceModeUpdateCb will be called when the service mode gets updated
serviceModeUpdateCb func(pdpb.ServiceMode)
// leaderSwitchedCbs will be called after the leader switched
leaderSwitchedCbs []leaderSwitchedCallbackFunc
// membersChangedCbs will be called after there is any membership change in the
// leader and followers
membersChangedCbs []func()
}

func newServiceCallbacks() *serviceCallbacks {
return &serviceCallbacks{
leaderSwitchedCbs: make([]leaderSwitchedCallbackFunc, 0),
membersChangedCbs: make([]func(), 0),
}
}

func (c *serviceCallbacks) setServiceModeUpdateCallback(cb func(pdpb.ServiceMode)) {
c.Lock()
defer c.Unlock()
c.serviceModeUpdateCb = cb
}

func (c *serviceCallbacks) addLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc) {
c.Lock()
defer c.Unlock()
c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, cb)
}

func (c *serviceCallbacks) addMembersChangedCallback(cb func()) {
c.Lock()
defer c.Unlock()
c.membersChangedCbs = append(c.membersChangedCbs, cb)
}

func (c *serviceCallbacks) onServiceModeUpdate(mode pdpb.ServiceMode) {
c.RLock()
cb := c.serviceModeUpdateCb
c.RUnlock()

if cb == nil {
return
}
cb(mode)
}

func (c *serviceCallbacks) onLeaderSwitched(leader string) error {
c.RLock()
cbs := make([]leaderSwitchedCallbackFunc, len(c.leaderSwitchedCbs))
copy(cbs, c.leaderSwitchedCbs)
c.RUnlock()

var err error
for _, cb := range cbs {
if cb == nil {
continue

Check warning on line 82 in client/servicediscovery/callbacks.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/callbacks.go#L82

Added line #L82 was not covered by tests
}
err = cb(leader)
if err != nil {
return err
}

Check warning on line 87 in client/servicediscovery/callbacks.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/callbacks.go#L86-L87

Added lines #L86 - L87 were not covered by tests
}
return nil
}

func (c *serviceCallbacks) onMembersChanged() {
c.RLock()
cbs := make([]func(), len(c.membersChangedCbs))
copy(cbs, c.membersChangedCbs)
c.RUnlock()

for _, cb := range cbs {
if cb == nil {
continue

Check warning on line 100 in client/servicediscovery/callbacks.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/callbacks.go#L100

Added line #L100 was not covered by tests
}
cb()
}
}
11 changes: 7 additions & 4 deletions client/servicediscovery/mock_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@
// CheckMemberChanged implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) CheckMemberChanged() error { return nil }

// AddServingURLSwitchedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) AddServingURLSwitchedCallback(...func()) {}
// ExecAndAddLeaderSwitchedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) ExecAndAddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {}

Check warning on line 104 in client/servicediscovery/mock_service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/mock_service_discovery.go#L104

Added line #L104 was not covered by tests

// AddServiceURLsSwitchedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) AddServiceURLsSwitchedCallback(...func()) {}
// AddLeaderSwitchedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) AddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {}

Check warning on line 107 in client/servicediscovery/mock_service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/mock_service_discovery.go#L107

Added line #L107 was not covered by tests

// AddMembersChangedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) AddMembersChangedCallback(func()) {}

Check warning on line 110 in client/servicediscovery/mock_service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/mock_service_discovery.go#L110

Added line #L110 was not covered by tests
92 changes: 34 additions & 58 deletions client/servicediscovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,16 @@
// CheckMemberChanged immediately check if there is any membership change among the leader/followers
// in a quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster.
CheckMemberChanged() error
// AddServingURLSwitchedCallback adds callbacks which will be called when the leader
// ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then.
ExecAndAddLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc)
// AddLeaderSwitchedCallback adds callbacks which will be called when the leader
// in a quorum-based cluster or the primary in a primary/secondary configured cluster
// is switched.
AddServingURLSwitchedCallback(callbacks ...func())
// AddServiceURLsSwitchedCallback adds callbacks which will be called when any leader/follower
AddLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc)
// AddMembersChangedCallback adds callbacks which will be called when any leader/follower
// in a quorum-based cluster or any primary/secondary in a primary/secondary configured cluster
// is changed.
AddServiceURLsSwitchedCallback(callbacks ...func())
AddMembersChangedCallback(cb func())
}

// ServiceClient is an interface that defines a set of operations for a raw PD gRPC client to specific PD server.
Expand Down Expand Up @@ -394,18 +396,8 @@

// UpdateKeyspaceIDFunc is the function type for updating the keyspace ID.
type UpdateKeyspaceIDFunc func() error
type tsoLeaderURLUpdatedFunc func(string) error

// TSOEventSource subscribes to events related to changes in the TSO leader/primary from the service discovery.
type TSOEventSource interface {
// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader/primary is updated.
SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc)
}

var (
_ ServiceDiscovery = (*serviceDiscovery)(nil)
_ TSOEventSource = (*serviceDiscovery)(nil)
)
var _ ServiceDiscovery = (*serviceDiscovery)(nil)

// serviceDiscovery is the service discovery client of PD/PD service which is quorum based
type serviceDiscovery struct {
Expand All @@ -426,15 +418,7 @@
// url -> a gRPC connection
clientConns sync.Map // Store as map[string]*grpc.ClientConn

// serviceModeUpdateCb will be called when the service mode gets updated
serviceModeUpdateCb func(pdpb.ServiceMode)
// leaderSwitchedCbs will be called after the leader switched
leaderSwitchedCbs []func()
// membersChangedCbs will be called after there is any membership change in the
// leader and followers
membersChangedCbs []func()
// tsoLeaderUpdatedCb will be called when the TSO leader is updated.
tsoLeaderUpdatedCb tsoLeaderURLUpdatedFunc
callbacks *serviceCallbacks

checkMembershipCh chan struct{}

Expand Down Expand Up @@ -474,12 +458,13 @@
cancel: cancel,
wg: wg,
apiCandidateNodes: [apiKindCount]*serviceBalancer{newServiceBalancer(emptyErrorFn), newServiceBalancer(regionAPIErrorFn)},
serviceModeUpdateCb: serviceModeUpdateCb,
callbacks: newServiceCallbacks(),
updateKeyspaceIDFunc: updateKeyspaceIDFunc,
keyspaceID: keyspaceID,
tlsCfg: tlsCfg,
option: option,
}
pdsd.callbacks.setServiceModeUpdateCallback(serviceModeUpdateCb)
urls = tlsutil.AddrsToURLs(urls, tlsCfg)
pdsd.urls.Store(urls)
return pdsd
Expand Down Expand Up @@ -570,7 +555,7 @@
failpoint.Return()
})
failpoint.Inject("usePDServiceMode", func() {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
c.callbacks.onServiceModeUpdate(pdpb.ServiceMode_PD_SVC_MODE)
failpoint.Return()
})

Expand Down Expand Up @@ -791,27 +776,29 @@
return c.updateMember()
}

// AddServingURLSwitchedCallback adds callbacks which will be called
// when the leader is switched.
func (c *serviceDiscovery) AddServingURLSwitchedCallback(callbacks ...func()) {
c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, callbacks...)
}

// AddServiceURLsSwitchedCallback adds callbacks which will be called when
// any leader/follower is changed.
func (c *serviceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func()) {
c.membersChangedCbs = append(c.membersChangedCbs, callbacks...)
}

// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated.
func (c *serviceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) {
// ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then.
func (c *serviceDiscovery) ExecAndAddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) {
url := c.getLeaderURL()
if len(url) > 0 {
if err := callback(url); err != nil {
log.Error("[tso] failed to call back when tso leader url update", zap.String("url", url), errs.ZapError(err))
log.Error("[pd] failed to run a callback with the current leader url",
zap.String("url", url), errs.ZapError(err))

Check warning on line 785 in client/servicediscovery/service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/service_discovery.go#L784-L785

Added lines #L784 - L785 were not covered by tests
}
}
c.tsoLeaderUpdatedCb = callback
c.AddLeaderSwitchedCallback(callback)
}

// AddLeaderSwitchedCallback adds callbacks which will be called when the leader
// in a quorum-based cluster or the primary in a primary/secondary configured cluster
// is switched.
func (c *serviceDiscovery) AddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) {
c.callbacks.addLeaderSwitchedCallback(callback)
}

// AddMembersChangedCallback adds callbacks which will be called when any primary/secondary
// in a primary/secondary configured cluster is changed.
func (c *serviceDiscovery) AddMembersChangedCallback(callback func()) {
c.callbacks.addMembersChangedCallback(callback)
}

// getLeaderURL returns the leader URL.
Expand Down Expand Up @@ -867,19 +854,15 @@
// If the method is not supported, we set it to pd mode.
// TODO: it's a hack way to solve the compatibility issue.
// we need to remove this after all maintained version supports the method.
if c.serviceModeUpdateCb != nil {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
}
c.callbacks.onServiceModeUpdate(pdpb.ServiceMode_PD_SVC_MODE)

Check warning on line 857 in client/servicediscovery/service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/service_discovery.go#L857

Added line #L857 was not covered by tests
return nil
}
return err
}
if clusterInfo == nil || len(clusterInfo.ServiceModes) == 0 {
return errors.WithStack(errs.ErrNoServiceModeReturned)
}
if c.serviceModeUpdateCb != nil {
c.serviceModeUpdateCb(clusterInfo.ServiceModes[0])
}
c.callbacks.onServiceModeUpdate(clusterInfo.ServiceModes[0])
return nil
}

Expand Down Expand Up @@ -968,9 +951,7 @@
}
c.urls.Store(urls)
// Run callbacks to reflect the membership changes in the leader and followers.
for _, cb := range c.membersChangedCbs {
cb()
}
c.callbacks.onMembersChanged()
log.Info("[pd] update member urls", zap.Strings("old-urls", oldURLs), zap.Strings("new-urls", urls))
}

Expand All @@ -987,13 +968,8 @@
c.leader.Store(leaderClient)
}
// Run callbacks
if c.tsoLeaderUpdatedCb != nil {
if err := c.tsoLeaderUpdatedCb(url); err != nil {
return true, err
}
}
for _, cb := range c.leaderSwitchedCbs {
cb()
if err := c.callbacks.onLeaderSwitched(url); err != nil {
return true, err

Check warning on line 972 in client/servicediscovery/service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/service_discovery.go#L972

Added line #L972 was not covered by tests
}
log.Info("[pd] switch leader", zap.String("new-leader", url), zap.String("old-leader", oldLeader.GetURL()))
return true, err
Expand Down
3 changes: 2 additions & 1 deletion client/servicediscovery/service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func TestUpdateURLs(t *testing.T) {
}
return
}
cli := &serviceDiscovery{option: opt.NewOption()}
cli := &serviceDiscovery{callbacks: newServiceCallbacks(), option: opt.NewOption()}
cli.urls.Store([]string{})
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
Expand All @@ -422,6 +422,7 @@ func TestGRPCDialOption(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond)
defer cancel()
cli := &serviceDiscovery{
callbacks: newServiceCallbacks(),
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
Expand Down
Loading
Loading