Skip to content

Commit

Permalink
fix potential bug
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed May 28, 2024
1 parent 2d69006 commit c851363
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 27 deletions.
9 changes: 6 additions & 3 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,12 @@ func (c *compactionPlanHandler) enqueueCompaction(task CompactionTask) error {
// todo optimize @wayblink
// currently compaction trigger v1 v2 both exist and have no shared lock between them to protect
// temporarily refuse the enqueue
succeed := c.checkAndSetSegmentsCompacting(task)
exist, succeed := c.checkAndSetSegmentsCompacting(task)
if !exist {
return merr.WrapErrIllegalCompactionPlan("segment not exist")
}
if !succeed {
return errors.New("compaction plan conflict")
return merr.WrapErrCompactionPlanConflict("segment is compacting")
}

_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", task.GetType()))
Expand Down Expand Up @@ -331,7 +334,7 @@ func (c *compactionPlanHandler) setSegmentsCompacting(task CompactionTask, compa
}
}

func (c *compactionPlanHandler) checkAndSetSegmentsCompacting(task CompactionTask) bool {
func (c *compactionPlanHandler) checkAndSetSegmentsCompacting(task CompactionTask) (bool, bool) {
return c.meta.CheckAndSetSegmentsCompacting(task.GetInputSegments())
}

Expand Down
15 changes: 10 additions & 5 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type CompactionMeta interface {
GetHealthySegment(segID UniqueID) *SegmentInfo
UpdateSegmentsInfo(operators ...UpdateOperator) error
SetSegmentCompacting(segmentID int64, compacting bool)
CheckAndSetSegmentsCompacting(segmentIDs []int64) bool
CheckAndSetSegmentsCompacting(segmentIDs []int64) (bool, bool)

SaveClusteringCompactionTask(task *datapb.CompactionTask) error
DropClusteringCompactionTask(task *datapb.CompactionTask) error
Expand Down Expand Up @@ -1252,19 +1252,24 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
// CheckAndSetSegmentsCompacting check all segments are not compacting
// if true, set them compacting and return true
// if false, skip setting and
func (m *meta) CheckAndSetSegmentsCompacting(segmentIDs []UniqueID) (hasCompactingSegment bool) {
func (m *meta) CheckAndSetSegmentsCompacting(segmentIDs []UniqueID) (exist, hasCompactingSegment bool) {
m.Lock()
defer m.Unlock()
for _, segmentID := range segmentIDs {
hasCompactingSegment = m.segments.GetSegment(segmentID).isCompacting
seg := m.segments.GetSegment(segmentID)
if seg != nil {
hasCompactingSegment = seg.isCompacting
} else {
return false, false
}
}
if hasCompactingSegment {
return false
return true, false
}
for _, segmentID := range segmentIDs {
m.segments.SetIsCompacting(segmentID, true)
}
return true
return true, true
}

// SetSegmentLevel sets level for segment
Expand Down
20 changes: 15 additions & 5 deletions internal/datacoord/mock_compaction_meta.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 12 additions & 8 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,15 +549,17 @@ func (t *clusteringCompactionTask) mappingSegment(
remained++

currentSize := t.totalBufferSize.Load()
// trigger spill
if clusterBuffer.writer.GetRowNum() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() {
// reach segment/binlog max size
t.spillChan <- SpillSignal{
buffer: clusterBuffer,
if (remained+1)%20 == 0 {
// trigger spill
if clusterBuffer.writer.GetRowNum() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() {
// reach segment/binlog max size
t.spillChan <- SpillSignal{
buffer: clusterBuffer,
}
} else if currentSize >= t.getMemoryBufferMiddleWatermark() {
// reach spill trigger threshold
t.spillChan <- SpillSignal{}
}
} else if currentSize >= t.getMemoryBufferMiddleWatermark() {
// reach spill trigger threshold
t.spillChan <- SpillSignal{}
}

// if the total buffer size is too large, block here, wait for memory release by spill
Expand Down Expand Up @@ -751,6 +753,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
}

func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuffer) error {
log := log.With(zap.Int("bufferID", buffer.id), zap.Int64("bufferSize", buffer.bufferRowNum.Load()))
if buffer.writer.IsEmpty() {
return nil
}
Expand Down Expand Up @@ -780,6 +783,7 @@ func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuf
buffer.bufferSize.Store(0)
buffer.bufferRowNum.Store(0)

log.Info("finish spill binlogs")
if buffer.flushedRowNum > t.plan.GetMaxSegmentRows() {
if err := t.packBufferToSegment(ctx, buffer); err != nil {
return err
Expand Down
13 changes: 7 additions & 6 deletions pkg/util/merr/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,13 @@ var (
// Compaction
ErrCompactionReadDeltaLogErr = newMilvusError("fail to read delta log", 2300, false)
ErrIllegalCompactionPlan = newMilvusError("compaction plan illegal", 2301, false)
ErrClusteringCompactionClusterNotSupport = newMilvusError("milvus cluster not support clustering compaction", 2302, false)
ErrClusteringCompactionCollectionNotSupport = newMilvusError("collection not support clustering compaction", 2303, false)
ErrClusteringCompactionCollectionIsCompacting = newMilvusError("collection is compacting", 2304, false)
ErrClusteringCompactionNotSupportVector = newMilvusError("vector field clustering compaction is not supported", 2305, false)
ErrClusteringCompactionSubmitTaskFail = newMilvusError("fail to submit task", 2306, true)
ErrClusteringCompactionMetaError = newMilvusError("fail to update meta in clustering compaction", 2307, true)
ErrCompactionPlanConflict = newMilvusError("compaction plan conflict", 2302, false)
ErrClusteringCompactionClusterNotSupport = newMilvusError("milvus cluster not support clustering compaction", 2303, false)
ErrClusteringCompactionCollectionNotSupport = newMilvusError("collection not support clustering compaction", 2304, false)
ErrClusteringCompactionCollectionIsCompacting = newMilvusError("collection is compacting", 2305, false)
ErrClusteringCompactionNotSupportVector = newMilvusError("vector field clustering compaction is not supported", 2306, false)
ErrClusteringCompactionSubmitTaskFail = newMilvusError("fail to submit task", 2307, true)
ErrClusteringCompactionMetaError = newMilvusError("fail to update meta in clustering compaction", 2308, true)

// General
ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false)
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/merr/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,14 @@ func WrapErrIllegalCompactionPlan(msg ...string) error {
return err
}

func WrapErrCompactionPlanConflict(msg ...string) error {
err := error(ErrCompactionPlanConflict)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}

func WrapErrClusteringCompactionClusterNotSupport(msg ...string) error {
err := error(ErrClusteringCompactionClusterNotSupport)
if len(msg) > 0 {
Expand Down

0 comments on commit c851363

Please sign in to comment.