From ae6df14874d4deeba5713bbb6e390c82c1ab7ef3 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 22 Jan 2025 11:41:23 +0800 Subject: [PATCH] client/sd: unify the service discovery callbacks within a struct (#9014) ref tikv/pd#8690 Unify the service discovery callbacks within a struct. Signed-off-by: JmPotato --- client/clients/tso/client.go | 5 +- client/inner_client.go | 5 +- client/servicediscovery/callbacks.go | 104 ++++++++++++++++++ .../mock_service_discovery.go | 11 +- client/servicediscovery/service_discovery.go | 92 ++++++---------- .../service_discovery_test.go | 3 +- .../servicediscovery/tso_service_discovery.go | 27 ++--- tests/integrations/client/client_test.go | 3 +- 8 files changed, 166 insertions(+), 84 deletions(-) create mode 100644 client/servicediscovery/callbacks.go diff --git a/client/clients/tso/client.go b/client/clients/tso/client.go index 7bc768ee21b..8af2890750e 100644 --- a/client/clients/tso/client.go +++ b/client/clients/tso/client.go @@ -116,9 +116,8 @@ func NewClient( }, } - eventSrc := svcDiscovery.(sd.TSOEventSource) - eventSrc.SetTSOLeaderURLUpdatedCallback(c.updateTSOLeaderURL) - c.svcDiscovery.AddServiceURLsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs) + c.svcDiscovery.ExecAndAddLeaderSwitchedCallback(c.updateTSOLeaderURL) + c.svcDiscovery.AddMembersChangedCallback(c.scheduleUpdateTSOConnectionCtxs) return c } diff --git a/client/inner_client.go b/client/inner_client.go index cff5b95ed54..de13bb324f5 100644 --- a/client/inner_client.go +++ b/client/inner_client.go @@ -143,11 +143,12 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) { } } -func (c *innerClient) scheduleUpdateTokenConnection() { +func (c *innerClient) scheduleUpdateTokenConnection(string) error { select { case c.updateTokenConnectionCh <- struct{}{}: default: } + return nil } func (c *innerClient) getServiceMode() pdpb.ServiceMode { @@ -188,7 +189,7 @@ func (c *innerClient) setup() error { } // Register callbacks - c.serviceDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection) + c.serviceDiscovery.AddLeaderSwitchedCallback(c.scheduleUpdateTokenConnection) // Create dispatchers c.createTokenDispatcher() diff --git a/client/servicediscovery/callbacks.go b/client/servicediscovery/callbacks.go new file mode 100644 index 00000000000..c21217f7e44 --- /dev/null +++ b/client/servicediscovery/callbacks.go @@ -0,0 +1,104 @@ +// Copyright 2025 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicediscovery + +import ( + "sync" + + "github.com/pingcap/kvproto/pkg/pdpb" +) + +type leaderSwitchedCallbackFunc func(string) error + +// serviceCallbacks contains all the callback functions for service discovery events +type serviceCallbacks struct { + sync.RWMutex + // serviceModeUpdateCb will be called when the service mode gets updated + serviceModeUpdateCb func(pdpb.ServiceMode) + // leaderSwitchedCbs will be called after the leader switched + leaderSwitchedCbs []leaderSwitchedCallbackFunc + // membersChangedCbs will be called after there is any membership change in the + // leader and followers + membersChangedCbs []func() +} + +func newServiceCallbacks() *serviceCallbacks { + return &serviceCallbacks{ + leaderSwitchedCbs: make([]leaderSwitchedCallbackFunc, 0), + membersChangedCbs: make([]func(), 0), + } +} + +func (c *serviceCallbacks) setServiceModeUpdateCallback(cb func(pdpb.ServiceMode)) { + c.Lock() + defer c.Unlock() + c.serviceModeUpdateCb = cb +} + +func (c *serviceCallbacks) addLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc) { + c.Lock() + defer c.Unlock() + c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, cb) +} + +func (c *serviceCallbacks) addMembersChangedCallback(cb func()) { + c.Lock() + defer c.Unlock() + c.membersChangedCbs = append(c.membersChangedCbs, cb) +} + +func (c *serviceCallbacks) onServiceModeUpdate(mode pdpb.ServiceMode) { + c.RLock() + cb := c.serviceModeUpdateCb + c.RUnlock() + + if cb == nil { + return + } + cb(mode) +} + +func (c *serviceCallbacks) onLeaderSwitched(leader string) error { + c.RLock() + cbs := make([]leaderSwitchedCallbackFunc, len(c.leaderSwitchedCbs)) + copy(cbs, c.leaderSwitchedCbs) + c.RUnlock() + + var err error + for _, cb := range cbs { + if cb == nil { + continue + } + err = cb(leader) + if err != nil { + return err + } + } + return nil +} + +func (c *serviceCallbacks) onMembersChanged() { + c.RLock() + cbs := make([]func(), len(c.membersChangedCbs)) + copy(cbs, c.membersChangedCbs) + c.RUnlock() + + for _, cb := range cbs { + if cb == nil { + continue + } + cb() + } +} diff --git a/client/servicediscovery/mock_service_discovery.go b/client/servicediscovery/mock_service_discovery.go index 6ca649f4575..362626243f7 100644 --- a/client/servicediscovery/mock_service_discovery.go +++ b/client/servicediscovery/mock_service_discovery.go @@ -100,8 +100,11 @@ func (*mockServiceDiscovery) ScheduleCheckMemberChanged() {} // CheckMemberChanged implements the ServiceDiscovery interface. func (*mockServiceDiscovery) CheckMemberChanged() error { return nil } -// AddServingURLSwitchedCallback implements the ServiceDiscovery interface. -func (*mockServiceDiscovery) AddServingURLSwitchedCallback(...func()) {} +// ExecAndAddLeaderSwitchedCallback implements the ServiceDiscovery interface. +func (*mockServiceDiscovery) ExecAndAddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {} -// AddServiceURLsSwitchedCallback implements the ServiceDiscovery interface. -func (*mockServiceDiscovery) AddServiceURLsSwitchedCallback(...func()) {} +// AddLeaderSwitchedCallback implements the ServiceDiscovery interface. +func (*mockServiceDiscovery) AddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {} + +// AddMembersChangedCallback implements the ServiceDiscovery interface. +func (*mockServiceDiscovery) AddMembersChangedCallback(func()) {} diff --git a/client/servicediscovery/service_discovery.go b/client/servicediscovery/service_discovery.go index f5ac665b7cd..1a6b8a7043f 100644 --- a/client/servicediscovery/service_discovery.go +++ b/client/servicediscovery/service_discovery.go @@ -126,14 +126,16 @@ type ServiceDiscovery interface { // CheckMemberChanged immediately check if there is any membership change among the leader/followers // in a quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster. CheckMemberChanged() error - // AddServingURLSwitchedCallback adds callbacks which will be called when the leader + // ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then. + ExecAndAddLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc) + // AddLeaderSwitchedCallback adds callbacks which will be called when the leader // in a quorum-based cluster or the primary in a primary/secondary configured cluster // is switched. - AddServingURLSwitchedCallback(callbacks ...func()) - // AddServiceURLsSwitchedCallback adds callbacks which will be called when any leader/follower + AddLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc) + // AddMembersChangedCallback adds callbacks which will be called when any leader/follower // in a quorum-based cluster or any primary/secondary in a primary/secondary configured cluster // is changed. - AddServiceURLsSwitchedCallback(callbacks ...func()) + AddMembersChangedCallback(cb func()) } // ServiceClient is an interface that defines a set of operations for a raw PD gRPC client to specific PD server. @@ -394,18 +396,8 @@ func (c *serviceBalancer) get() (ret ServiceClient) { // UpdateKeyspaceIDFunc is the function type for updating the keyspace ID. type UpdateKeyspaceIDFunc func() error -type tsoLeaderURLUpdatedFunc func(string) error -// TSOEventSource subscribes to events related to changes in the TSO leader/primary from the service discovery. -type TSOEventSource interface { - // SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader/primary is updated. - SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) -} - -var ( - _ ServiceDiscovery = (*serviceDiscovery)(nil) - _ TSOEventSource = (*serviceDiscovery)(nil) -) +var _ ServiceDiscovery = (*serviceDiscovery)(nil) // serviceDiscovery is the service discovery client of PD/PD service which is quorum based type serviceDiscovery struct { @@ -426,15 +418,7 @@ type serviceDiscovery struct { // url -> a gRPC connection clientConns sync.Map // Store as map[string]*grpc.ClientConn - // serviceModeUpdateCb will be called when the service mode gets updated - serviceModeUpdateCb func(pdpb.ServiceMode) - // leaderSwitchedCbs will be called after the leader switched - leaderSwitchedCbs []func() - // membersChangedCbs will be called after there is any membership change in the - // leader and followers - membersChangedCbs []func() - // tsoLeaderUpdatedCb will be called when the TSO leader is updated. - tsoLeaderUpdatedCb tsoLeaderURLUpdatedFunc + callbacks *serviceCallbacks checkMembershipCh chan struct{} @@ -474,12 +458,13 @@ func NewServiceDiscovery( cancel: cancel, wg: wg, apiCandidateNodes: [apiKindCount]*serviceBalancer{newServiceBalancer(emptyErrorFn), newServiceBalancer(regionAPIErrorFn)}, - serviceModeUpdateCb: serviceModeUpdateCb, + callbacks: newServiceCallbacks(), updateKeyspaceIDFunc: updateKeyspaceIDFunc, keyspaceID: keyspaceID, tlsCfg: tlsCfg, option: option, } + pdsd.callbacks.setServiceModeUpdateCallback(serviceModeUpdateCb) urls = tlsutil.AddrsToURLs(urls, tlsCfg) pdsd.urls.Store(urls) return pdsd @@ -570,7 +555,7 @@ func (c *serviceDiscovery) updateServiceModeLoop() { failpoint.Return() }) failpoint.Inject("usePDServiceMode", func() { - c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE) + c.callbacks.onServiceModeUpdate(pdpb.ServiceMode_PD_SVC_MODE) failpoint.Return() }) @@ -791,27 +776,29 @@ func (c *serviceDiscovery) CheckMemberChanged() error { return c.updateMember() } -// AddServingURLSwitchedCallback adds callbacks which will be called -// when the leader is switched. -func (c *serviceDiscovery) AddServingURLSwitchedCallback(callbacks ...func()) { - c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, callbacks...) -} - -// AddServiceURLsSwitchedCallback adds callbacks which will be called when -// any leader/follower is changed. -func (c *serviceDiscovery) AddServiceURLsSwitchedCallback(callbacks ...func()) { - c.membersChangedCbs = append(c.membersChangedCbs, callbacks...) -} - -// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated. -func (c *serviceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) { +// ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then. +func (c *serviceDiscovery) ExecAndAddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) { url := c.getLeaderURL() if len(url) > 0 { if err := callback(url); err != nil { - log.Error("[tso] failed to call back when tso leader url update", zap.String("url", url), errs.ZapError(err)) + log.Error("[pd] failed to run a callback with the current leader url", + zap.String("url", url), errs.ZapError(err)) } } - c.tsoLeaderUpdatedCb = callback + c.AddLeaderSwitchedCallback(callback) +} + +// AddLeaderSwitchedCallback adds callbacks which will be called when the leader +// in a quorum-based cluster or the primary in a primary/secondary configured cluster +// is switched. +func (c *serviceDiscovery) AddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) { + c.callbacks.addLeaderSwitchedCallback(callback) +} + +// AddMembersChangedCallback adds callbacks which will be called when any primary/secondary +// in a primary/secondary configured cluster is changed. +func (c *serviceDiscovery) AddMembersChangedCallback(callback func()) { + c.callbacks.addMembersChangedCallback(callback) } // getLeaderURL returns the leader URL. @@ -867,9 +854,7 @@ func (c *serviceDiscovery) checkServiceModeChanged() error { // If the method is not supported, we set it to pd mode. // TODO: it's a hack way to solve the compatibility issue. // we need to remove this after all maintained version supports the method. - if c.serviceModeUpdateCb != nil { - c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE) - } + c.callbacks.onServiceModeUpdate(pdpb.ServiceMode_PD_SVC_MODE) return nil } return err @@ -877,9 +862,7 @@ func (c *serviceDiscovery) checkServiceModeChanged() error { if clusterInfo == nil || len(clusterInfo.ServiceModes) == 0 { return errors.WithStack(errs.ErrNoServiceModeReturned) } - if c.serviceModeUpdateCb != nil { - c.serviceModeUpdateCb(clusterInfo.ServiceModes[0]) - } + c.callbacks.onServiceModeUpdate(clusterInfo.ServiceModes[0]) return nil } @@ -968,9 +951,7 @@ func (c *serviceDiscovery) updateURLs(members []*pdpb.Member) { } c.urls.Store(urls) // Run callbacks to reflect the membership changes in the leader and followers. - for _, cb := range c.membersChangedCbs { - cb() - } + c.callbacks.onMembersChanged() log.Info("[pd] update member urls", zap.Strings("old-urls", oldURLs), zap.Strings("new-urls", urls)) } @@ -987,13 +968,8 @@ func (c *serviceDiscovery) switchLeader(url string) (bool, error) { c.leader.Store(leaderClient) } // Run callbacks - if c.tsoLeaderUpdatedCb != nil { - if err := c.tsoLeaderUpdatedCb(url); err != nil { - return true, err - } - } - for _, cb := range c.leaderSwitchedCbs { - cb() + if err := c.callbacks.onLeaderSwitched(url); err != nil { + return true, err } log.Info("[pd] switch leader", zap.String("new-leader", url), zap.String("old-leader", oldLeader.GetURL())) return true, err diff --git a/client/servicediscovery/service_discovery_test.go b/client/servicediscovery/service_discovery_test.go index 0a678718fdc..0c7d87d774b 100644 --- a/client/servicediscovery/service_discovery_test.go +++ b/client/servicediscovery/service_discovery_test.go @@ -400,7 +400,7 @@ func TestUpdateURLs(t *testing.T) { } return } - cli := &serviceDiscovery{option: opt.NewOption()} + cli := &serviceDiscovery{callbacks: newServiceCallbacks(), option: opt.NewOption()} cli.urls.Store([]string{}) cli.updateURLs(members[1:]) re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs()) @@ -422,6 +422,7 @@ func TestGRPCDialOption(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond) defer cancel() cli := &serviceDiscovery{ + callbacks: newServiceCallbacks(), checkMembershipCh: make(chan struct{}, 1), ctx: ctx, cancel: cancel, diff --git a/client/servicediscovery/tso_service_discovery.go b/client/servicediscovery/tso_service_discovery.go index 7734fd23107..06401427522 100644 --- a/client/servicediscovery/tso_service_discovery.go +++ b/client/servicediscovery/tso_service_discovery.go @@ -55,10 +55,7 @@ const ( tsoQueryRetryInterval = 500 * time.Millisecond ) -var ( - _ ServiceDiscovery = (*tsoServiceDiscovery)(nil) - _ TSOEventSource = (*tsoServiceDiscovery)(nil) -) +var _ ServiceDiscovery = (*tsoServiceDiscovery)(nil) // keyspaceGroupSvcDiscovery is used for discovering the serving endpoints of the keyspace // group to which the keyspace belongs @@ -144,7 +141,7 @@ type tsoServiceDiscovery struct { clientConns sync.Map // Store as map[string]*grpc.ClientConn // tsoLeaderUpdatedCb will be called when the TSO leader is updated. - tsoLeaderUpdatedCb tsoLeaderURLUpdatedFunc + tsoLeaderUpdatedCb leaderSwitchedCallbackFunc checkMembershipCh chan struct{} @@ -361,16 +358,8 @@ func (c *tsoServiceDiscovery) CheckMemberChanged() error { return nil } -// AddServingURLSwitchedCallback adds callbacks which will be called when the primary in -// a primary/secondary configured cluster is switched. -func (*tsoServiceDiscovery) AddServingURLSwitchedCallback(...func()) {} - -// AddServiceURLsSwitchedCallback adds callbacks which will be called when any primary/secondary -// in a primary/secondary configured cluster is changed. -func (*tsoServiceDiscovery) AddServiceURLsSwitchedCallback(...func()) {} - -// SetTSOLeaderURLUpdatedCallback adds a callback which will be called when the TSO leader is updated. -func (c *tsoServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderURLUpdatedFunc) { +// ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then. +func (c *tsoServiceDiscovery) ExecAndAddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) { url := c.getPrimaryURL() if len(url) > 0 { if err := callback(url); err != nil { @@ -380,6 +369,14 @@ func (c *tsoServiceDiscovery) SetTSOLeaderURLUpdatedCallback(callback tsoLeaderU c.tsoLeaderUpdatedCb = callback } +// AddLeaderSwitchedCallback adds callbacks which will be called when the primary in +// a primary/secondary configured cluster is switched. +func (*tsoServiceDiscovery) AddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {} + +// AddMembersChangedCallback adds callbacks which will be called when any primary/secondary +// in a primary/secondary configured cluster is changed. +func (*tsoServiceDiscovery) AddMembersChangedCallback(func()) {} + // GetServiceClient implements ServiceDiscovery func (c *tsoServiceDiscovery) GetServiceClient() ServiceClient { return c.serviceDiscovery.GetServiceClient() diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 0e0bf25d74d..af8cdc00a7e 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -238,8 +238,9 @@ func TestGetTSAfterTransferLeader(t *testing.T) { defer cli.Close() var leaderSwitched atomic.Bool - cli.GetServiceDiscovery().AddServingURLSwitchedCallback(func() { + cli.GetServiceDiscovery().AddLeaderSwitchedCallback(func(string) error { leaderSwitched.Store(true) + return nil }) err = cluster.GetServer(leader).ResignLeader() re.NoError(err)