Skip to content

Commit

Permalink
Merge pull request #24 from xiaocai2333/forbid_L2_compaction
Browse files Browse the repository at this point in the history
Check L2 segment is compacting
  • Loading branch information
wayblink authored May 24, 2024
2 parents b00d210 + fea87de commit ef0d53d
Showing 1 changed file with 30 additions and 3 deletions.
33 changes: 30 additions & 3 deletions internal/datacoord/compaction_policy_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package datacoord
import (
"context"
"fmt"
"sort"
"time"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand All @@ -12,8 +15,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/samber/lo"
"go.uber.org/zap"
"sort"
"time"
)

var _ CompactionPolicy = (*clusteringCompactionPolicy)(nil)
Expand Down Expand Up @@ -72,6 +73,27 @@ func (policy *clusteringCompactionPolicy) Trigger(ctx context.Context) (map[Comp
return events, nil
}

func (policy *clusteringCompactionPolicy) checkAllL2SegmentsContains(ctx context.Context, collectionID, partitionID int64, channel string) bool {
getCompactingL2Segment := func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID &&
segment.PartitionID == partitionID &&
segment.InsertChannel == channel &&
isSegmentHealthy(segment) &&
segment.GetLevel() == datapb.SegmentLevel_L2 &&
segment.isCompacting
}
segments := policy.meta.SelectSegments(SegmentFilterFunc(getCompactingL2Segment))
if len(segments) > 0 {
log.Ctx(ctx).Info("there are some segments are compacting",
zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID),
zap.String("channel", channel), zap.Int64s("compacting segment", lo.Map(segments, func(segment *SegmentInfo, i int) int64 {
return segment.GetID()
})))
return false
}
return true
}

func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Context, collectionID int64, ts Timestamp, manual bool) ([]CompactionView, int64, error) {
log.Info("trigger collection clustering compaction", zap.Int64("collectionID", collectionID))
collection, err := policy.handler.GetCollection(ctx, collectionID)
Expand Down Expand Up @@ -118,10 +140,15 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
views := make([]CompactionView, 0)
// partSegments is list of chanPartSegments, which is channel-partition organized segments
for _, group := range partSegments {
log := log.With(zap.Int64("collectionID", group.collectionID),
log := log.Ctx(ctx).With(zap.Int64("collectionID", group.collectionID),
zap.Int64("partitionID", group.partitionID),
zap.String("channel", group.channelName))

if !policy.checkAllL2SegmentsContains(ctx, group.collectionID, group.partitionID, group.channelName) {
log.Warn("clustering compaction cannot be done, otherwise the performance will fall back")
continue
}

ct, err := getCompactTime(ts, collection)
if err != nil {
log.Warn("get compact time failed, skip to handle compaction")
Expand Down

0 comments on commit ef0d53d

Please sign in to comment.