Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: fix gc_safepoint #950

Merged
merged 5 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
}
// Ensure the start ts is valid in the next 3600 seconds, aka 1 hour
const ensureTTL = 60 * 60
createGcServiceID := h.server.GetEtcdClient().GetGCServiceID()
if err = gc.EnsureChangefeedStartTsSafety(
ctx,
h.server.GetPdClient(),
h.server.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceCreating),
createGcServiceID,
gc.EnsureGCServiceCreating,
changefeedID,
ensureTTL, cfg.StartTs); err != nil {
if !errors.ErrStartTsBeforeGC.Equal(err) {
Expand Down Expand Up @@ -176,7 +178,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
err := gc.UndoEnsureChangefeedStartTsSafety(
ctx,
pdClient,
h.server.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceCreating),
createGcServiceID,
changefeedID,
)
if err != nil {
Expand All @@ -200,7 +202,9 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {

log.Info("Create changefeed successfully!",
zap.String("id", info.ChangefeedID.Name()),
zap.String("changefeed", info.String()))
zap.String("state", string(info.State)),
zap.String("changefeedInfo", info.String()))

c.JSON(http.StatusOK, toAPIModel(
info,
&config.ChangeFeedStatus{
Expand Down Expand Up @@ -657,6 +661,7 @@ func verifyResumeChangefeedConfig(
ctx,
pdClient,
gcServiceID,
gc.EnsureGCServiceResuming,
changefeedID,
gcTTL, overrideCheckpointTs)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions coordinator/changefeed/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func NewChangefeed(cfID common.ChangeFeedID,
log.Info("changefeed instance created",
zap.String("id", cfID.String()),
zap.Uint64("checkpointTs", checkpointTs),
zap.String("state", string(info.State)),
zap.String("info", info.String()))
return res
}
Expand Down Expand Up @@ -138,13 +139,21 @@ func (c *Changefeed) UpdateStatus(newStatus *heartbeatpb.MaintainerStatus) (bool

if newStatus != nil && newStatus.CheckpointTs >= old.CheckpointTs {
c.status.Store(newStatus)
if old.BootstrapDone != newStatus.BootstrapDone {
log.Info("Received changefeed status with bootstrapDone",
zap.String("changefeed", c.ID.String()),
zap.Bool("bootstrapDone", newStatus.BootstrapDone))
return true, model.StateNormal, nil
}

info := c.GetInfo()
// the changefeed reaches the targetTs
if info.TargetTs != 0 && newStatus.CheckpointTs >= info.TargetTs {
return true, model.StateFinished, nil
}
return c.backoff.CheckStatus(newStatus)
}

return false, model.StateNormal, nil
}

Expand Down
9 changes: 5 additions & 4 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,14 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.Mainta
}
if nodeID != from {
// todo: handle the case that the node id is mismatch
log.Warn("node id not match",
log.Warn("remote changefeed maintainer nodeID mismatch with local record",
zap.String("changefeed", cfID.Name()),
zap.Stringer("from", from),
zap.Stringer("node", nodeID))
zap.Stringer("remoteNodeID", from),
zap.Stringer("localNodeID", nodeID))
continue
}
cfs[cfID] = cf

changed, state, err := cf.UpdateStatus(status)
if changed {
log.Info("changefeed status changed",
Expand All @@ -322,7 +324,6 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.Mainta
err: mErr,
}
}
cfs[cfID] = cf
}
select {
case c.updatedChangefeedCh <- cfs:
Expand Down
39 changes: 35 additions & 4 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ var (
metricsDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("coordinator")
)

var updateGCTickerInterval = 1 * time.Minute

// coordinator implements the Coordinator interface
type coordinator struct {
nodeInfo *node.Info
version int64
gcServiceID string
lastTickTime time.Time

controller *Controller
Expand All @@ -74,7 +77,7 @@ func New(node *node.Info,
pdClient pd.Client,
pdClock pdutil.Clock,
backend changefeed.Backend,
clusterID string,
gcServiceID string,
version int64,
batchSize int,
balanceCheckInterval time.Duration,
Expand All @@ -83,8 +86,9 @@ func New(node *node.Info,
c := &coordinator{
version: version,
nodeInfo: node,
gcServiceID: gcServiceID,
lastTickTime: time.Now(),
gcManager: gc.NewManager(clusterID, pdClient, pdClock),
gcManager: gc.NewManager(gcServiceID, pdClient, pdClock),
eventCh: chann.NewAutoDrainChann[*Event](),
pdClient: pdClient,
pdClock: pdClock,
Expand Down Expand Up @@ -142,7 +146,10 @@ func (c *coordinator) recvMessages(_ context.Context, msg *messaging.TargetMessa
// - if a node is removed, clean related state machine that bind to that node.
// 3. Schedule changefeeds if all node is bootstrapped.
func (c *coordinator) Run(ctx context.Context) error {
gcTick := time.NewTicker(time.Minute)
failpoint.Inject("InjectUpdateGCTickerInterval", func(val failpoint.Value) {
updateGCTickerInterval = time.Duration(val.(int) * int(time.Millisecond))
})
gcTick := time.NewTicker(updateGCTickerInterval)
ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
defer gcTick.Stop()
Expand Down Expand Up @@ -219,6 +226,20 @@ func (c *coordinator) handleStateChangedEvent(ctx context.Context, event *Change
c.controller.changefeedDB.Resume(event.ChangefeedID, false, false)
case model.StateFailed, model.StateFinished:
c.controller.operatorController.StopChangefeed(ctx, event.ChangefeedID, false)
case model.StateNormal:
log.Info("changefeed is resumed or created successfully, try to delete its gc safepoint",
zap.String("changefeed", event.ChangefeedID.String()))
// We need to clean its gc safepoint when changefeed is resumed or created
gcServiceID := c.getEnsureGCServiceID(gc.EnsureGCServiceCreating)
err := gc.UndoEnsureChangefeedStartTsSafety(ctx, c.pdClient, gcServiceID, event.ChangefeedID)
if err != nil {
log.Warn("failed to delete create changefeed gc safepoint", zap.Error(err))
}
gcServiceID = c.getEnsureGCServiceID(gc.EnsureGCServiceResuming)
err = gc.UndoEnsureChangefeedStartTsSafety(ctx, c.pdClient, gcServiceID, event.ChangefeedID)
if err != nil {
log.Warn("failed to delete resume changefeed gc safepoint", zap.Error(err))
}
default:
}
return nil
Expand Down Expand Up @@ -287,7 +308,12 @@ func (c *coordinator) saveCheckpointTs(ctx context.Context, cfs map[common.Chang
}

func (c *coordinator) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error {
return c.controller.CreateChangefeed(ctx, info)
err := c.controller.CreateChangefeed(ctx, info)
if err != nil {
return errors.Trace(err)
}
// update gc safepoint after create changefeed
return c.updateGCSafepoint(ctx)
}

func (c *coordinator) RemoveChangefeed(ctx context.Context, id common.ChangeFeedID) (uint64, error) {
Expand Down Expand Up @@ -358,3 +384,8 @@ func (c *coordinator) updateGCSafepoint(
err := c.gcManager.TryUpdateGCSafePoint(ctx, gcSafepointUpperBound, false)
return errors.Trace(err)
}

// GetEnsureGCServiceID return the prefix for the gc service id when changefeed is creating
func (c *coordinator) getEnsureGCServiceID(tag string) string {
return c.gcServiceID + tag
}
Loading
Loading