Skip to content

Commit

Permalink
*: adjust
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <[email protected]>
  • Loading branch information
asddongmen committed Jan 23, 2025
1 parent aa784f0 commit b14c519
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 25 deletions.
7 changes: 6 additions & 1 deletion coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,12 @@ func (c *coordinator) saveCheckpointTs(ctx context.Context, cfs map[common.Chang
}

func (c *coordinator) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error {
return c.controller.CreateChangefeed(ctx, info)
err := c.controller.CreateChangefeed(ctx, info)
if err != nil {
return errors.Trace(err)
}
// update gc safepoint after create changefeed
return c.updateGCSafepoint(ctx)
}

func (c *coordinator) RemoveChangefeed(ctx context.Context, id common.ChangeFeedID) (uint64, error) {
Expand Down
2 changes: 0 additions & 2 deletions maintainer/maintainer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type Controller struct {
messageCenter messaging.MessageCenter
nodeManager *watcher.NodeManager
tsoClient replica.TSOClient
pdAPIClient pdutil.PDAPIClient

splitter *split.Splitter
enableTableAcrossNodes bool
Expand Down Expand Up @@ -98,7 +97,6 @@ func NewController(changefeedID common.ChangeFeedID,
taskScheduler: taskScheduler,
cfConfig: cfConfig,
tsoClient: tsoClient,
pdAPIClient: pdAPIClient,
splitter: splitter,
enableTableAcrossNodes: enableTableAcrossNodes,
}
Expand Down
26 changes: 4 additions & 22 deletions pkg/txnutil/gc/gc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/retry"
pd "github.com/tikv/pd/client"
Expand All @@ -44,35 +43,18 @@ func EnsureChangefeedStartTsSafety(
changefeedID common.ChangeFeedID,
TTL int64, startTs uint64,
) error {
cdcGcTTL := config.GetGlobalServerConfig().GcTTL
// set gc safepoint for ticdc gc service first
// This is to ensure that the ticdc gc service gcSafepoint is set before
// the changefeed gc service gcSafepoint
minServiceGCTs, err := SetServiceGCSafepoint(
ctx, pdCli,
ticdcServiceID,
cdcGcTTL, startTs)
if err != nil {
return errors.Trace(err)
}
log.Info("set gc safepoint for ticdc service",
zap.String("gcServiceID", ticdcServiceID),
zap.Uint64("expectedGCSafepoint", startTs),
zap.Uint64("actualGCSafepoint", minServiceGCTs),
zap.Int64("ttl", cdcGcTTL))
gcServiceID := ticdcServiceID + tag + changefeedID.Namespace() + "_" + changefeedID.Name()

// set gc safepoint for the changefeed gc service
// set gc safepoint for the changefeed gc service
minServiceGCTs, err = SetServiceGCSafepoint(
minServiceGCTs, err := SetServiceGCSafepoint(
ctx, pdCli,
ticdcServiceID+tag+changefeedID.Namespace()+"_"+changefeedID.Name(),
gcServiceID,
TTL, startTs)
if err != nil {
return errors.Trace(err)
}

log.Info("set gc safepoint for changefeed",
zap.String("gcServiceID", ticdcServiceID+tag+changefeedID.Namespace()+"_"+changefeedID.Name()),
zap.String("gcServiceID", gcServiceID),
zap.Uint64("expectedGCSafepoint", startTs),
zap.Uint64("actualGCSafepoint", minServiceGCTs),
zap.Int64("ttl", TTL))
Expand Down

0 comments on commit b14c519

Please sign in to comment.