From b14c51947e96392a7675f777de8fb0dc3da345fd Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 23 Jan 2025 16:18:31 +0800 Subject: [PATCH] *: adjust Signed-off-by: dongmen <414110582@qq.com> --- coordinator/coordinator.go | 7 ++++++- maintainer/maintainer_controller.go | 2 -- pkg/txnutil/gc/gc_service.go | 26 ++++---------------------- 3 files changed, 10 insertions(+), 25 deletions(-) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 65ac3572..e80a0ef9 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -308,7 +308,12 @@ func (c *coordinator) saveCheckpointTs(ctx context.Context, cfs map[common.Chang } func (c *coordinator) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error { - return c.controller.CreateChangefeed(ctx, info) + err := c.controller.CreateChangefeed(ctx, info) + if err != nil { + return errors.Trace(err) + } + // update gc safepoint after create changefeed + return c.updateGCSafepoint(ctx) } func (c *coordinator) RemoveChangefeed(ctx context.Context, id common.ChangeFeedID) (uint64, error) { diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index 13396c81..d9d65236 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -52,7 +52,6 @@ type Controller struct { messageCenter messaging.MessageCenter nodeManager *watcher.NodeManager tsoClient replica.TSOClient - pdAPIClient pdutil.PDAPIClient splitter *split.Splitter enableTableAcrossNodes bool @@ -98,7 +97,6 @@ func NewController(changefeedID common.ChangeFeedID, taskScheduler: taskScheduler, cfConfig: cfConfig, tsoClient: tsoClient, - pdAPIClient: pdAPIClient, splitter: splitter, enableTableAcrossNodes: enableTableAcrossNodes, } diff --git a/pkg/txnutil/gc/gc_service.go b/pkg/txnutil/gc/gc_service.go index 59ffc679..735c3882 100644 --- a/pkg/txnutil/gc/gc_service.go +++ b/pkg/txnutil/gc/gc_service.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" - "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/retry" pd "github.com/tikv/pd/client" @@ -44,35 +43,18 @@ func EnsureChangefeedStartTsSafety( changefeedID common.ChangeFeedID, TTL int64, startTs uint64, ) error { - cdcGcTTL := config.GetGlobalServerConfig().GcTTL - // set gc safepoint for ticdc gc service first - // This is to ensure that the ticdc gc service gcSafepoint is set before - // the changefeed gc service gcSafepoint - minServiceGCTs, err := SetServiceGCSafepoint( - ctx, pdCli, - ticdcServiceID, - cdcGcTTL, startTs) - if err != nil { - return errors.Trace(err) - } - log.Info("set gc safepoint for ticdc service", - zap.String("gcServiceID", ticdcServiceID), - zap.Uint64("expectedGCSafepoint", startTs), - zap.Uint64("actualGCSafepoint", minServiceGCTs), - zap.Int64("ttl", cdcGcTTL)) + gcServiceID := ticdcServiceID + tag + changefeedID.Namespace() + "_" + changefeedID.Name() // set gc safepoint for the changefeed gc service - // set gc safepoint for the changefeed gc service - minServiceGCTs, err = SetServiceGCSafepoint( + minServiceGCTs, err := SetServiceGCSafepoint( ctx, pdCli, - ticdcServiceID+tag+changefeedID.Namespace()+"_"+changefeedID.Name(), + gcServiceID, TTL, startTs) if err != nil { return errors.Trace(err) } - log.Info("set gc safepoint for changefeed", - zap.String("gcServiceID", ticdcServiceID+tag+changefeedID.Namespace()+"_"+changefeedID.Name()), + zap.String("gcServiceID", gcServiceID), zap.Uint64("expectedGCSafepoint", startTs), zap.Uint64("actualGCSafepoint", minServiceGCTs), zap.Int64("ttl", TTL))