Skip to content

Commit

Permalink
Major compaction
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Jun 6, 2024
1 parent 27cc9f2 commit 0aff609
Show file tree
Hide file tree
Showing 92 changed files with 4,994 additions and 678 deletions.
208 changes: 160 additions & 48 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

//go:generate mockery --name=compactionPlanContext --structname=MockCompactionPlanContext --output=./ --filename=mock_compaction_plan_context.go --with-expecter --inpackage
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
// TODO we should split compaction into different priorities, small compaction helps to merge segment, large compaction helps to handle delta and expiration of large segments
const (
tsTimeout = uint64(1)
taskMaxRetryTimes = int32(3)
)

type compactionPlanContext interface {
start()
stop()
Expand Down Expand Up @@ -77,11 +83,13 @@ type compactionPlanHandler struct {
executingMu lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task

meta CompactionMeta
allocator allocator
chManager ChannelManager
sessions SessionManager
cluster Cluster
meta CompactionMeta
allocator allocator
chManager ChannelManager
sessions SessionManager
cluster Cluster
analyzeScheduler *taskScheduler
handler Handler

stopCh chan struct{}
stopOnce sync.Once
Expand All @@ -91,39 +99,66 @@ type compactionPlanHandler struct {
}

func (c *compactionPlanHandler) getCompactionInfo(triggerID int64) *compactionInfo {
var executingCnt int
var completedCnt int
var failedCnt int
var timeoutCnt int
ret := &compactionInfo{}
tasks := c.meta.GetCompactionTasksByTriggerID(triggerID)
return summaryCompactionState(tasks)
}

func summaryCompactionState(tasks []*datapb.CompactionTask) *compactionInfo {
ret := &compactionInfo{}
var executingCnt, pipeliningCnt, completedCnt, failedCnt, timeoutCnt, analyzingCnt, indexingCnt, cleanedCnt, metaSavedCnt int
mergeInfos := make(map[int64]*milvuspb.CompactionMergeInfo)
tasks := c.meta.GetCompactionTasksByTriggerID(triggerID)
for _, t := range tasks {
switch t.GetState() {
case datapb.CompactionTaskState_pipelining, datapb.CompactionTaskState_executing, datapb.CompactionTaskState_meta_saved:

for _, task := range tasks {
if task == nil {
continue
}
switch task.GetState() {
case datapb.CompactionTaskState_executing:
executingCnt++
case datapb.CompactionTaskState_pipelining:
pipeliningCnt++
case datapb.CompactionTaskState_completed:
completedCnt++
case datapb.CompactionTaskState_failed:
failedCnt++
case datapb.CompactionTaskState_timeout:
timeoutCnt++
case datapb.CompactionTaskState_analyzing:
analyzingCnt++
case datapb.CompactionTaskState_indexing:
indexingCnt++
case datapb.CompactionTaskState_cleaned:
cleanedCnt++
case datapb.CompactionTaskState_meta_saved:
metaSavedCnt++
default:
}
mergeInfos[t.GetPlanID()] = getCompactionMergeInfo(t)
mergeInfos[task.GetPlanID()] = getCompactionMergeInfo(task)
}

ret.executingCnt = executingCnt
ret.executingCnt = executingCnt + pipeliningCnt + analyzingCnt + indexingCnt + metaSavedCnt
ret.completedCnt = completedCnt
ret.timeoutCnt = timeoutCnt
ret.failedCnt = failedCnt
ret.mergeInfos = mergeInfos

if executingCnt != 0 {
if ret.executingCnt != 0 {
ret.state = commonpb.CompactionState_Executing
} else {
ret.state = commonpb.CompactionState_Completed
}

log.Info("compaction states",
zap.String("state", ret.state.String()),
zap.Int("executingCnt", executingCnt),
zap.Int("pipeliningCnt", pipeliningCnt),
zap.Int("completedCnt", completedCnt),
zap.Int("failedCnt", failedCnt),
zap.Int("timeoutCnt", timeoutCnt),
zap.Int("analyzingCnt", analyzingCnt),
zap.Int("indexingCnt", indexingCnt),
zap.Int("cleanedCnt", cleanedCnt),
zap.Int("metaSavedCnt", metaSavedCnt))
return ret
}

Expand All @@ -149,18 +184,20 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64)
return cnt
}

func newCompactionPlanHandler(cluster Cluster, sessions SessionManager, cm ChannelManager, meta CompactionMeta, allocator allocator,
func newCompactionPlanHandler(cluster Cluster, sessions SessionManager, cm ChannelManager, meta CompactionMeta, allocator allocator, analyzeScheduler *taskScheduler, handler Handler,
) *compactionPlanHandler {
return &compactionPlanHandler{
queueTasks: make(map[int64]CompactionTask),
chManager: cm,
meta: meta,
sessions: sessions,
allocator: allocator,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
taskNumber: atomic.NewInt32(0),
queueTasks: make(map[int64]CompactionTask),
chManager: cm,
meta: meta,
sessions: sessions,
allocator: allocator,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
taskNumber: atomic.NewInt32(0),
analyzeScheduler: analyzeScheduler,
handler: handler,
}
}

Expand All @@ -174,9 +211,9 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {

l0ChannelExcludes := typeutil.NewSet[string]()
mixChannelExcludes := typeutil.NewSet[string]()
// clusterChannelExcludes := typeutil.NewSet[string]()
clusterChannelExcludes := typeutil.NewSet[string]()
mixLabelExcludes := typeutil.NewSet[string]()
// clusterLabelExcludes := typeutil.NewSet[string]()
clusterLabelExcludes := typeutil.NewSet[string]()

c.executingMu.RLock()
for _, t := range c.executingTasks {
Expand All @@ -186,9 +223,9 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
case datapb.CompactionType_MixCompaction:
mixChannelExcludes.Insert(t.GetChannel())
mixLabelExcludes.Insert(t.GetLabel())
// case datapb.CompactionType_ClusteringCompaction:
// clusterChannelExcludes.Insert(t.GetChannel())
// clusterLabelExcludes.Insert(t.GetLabel())
case datapb.CompactionType_ClusteringCompaction:
clusterChannelExcludes.Insert(t.GetChannel())
clusterLabelExcludes.Insert(t.GetLabel())
}
}
c.executingMu.RUnlock()
Expand Down Expand Up @@ -217,28 +254,40 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
picked = append(picked, t)
mixChannelExcludes.Insert(t.GetChannel())
mixLabelExcludes.Insert(t.GetLabel())
// case datapb.CompactionType_ClusteringCompaction:
// if l0ChannelExcludes.Contain(t.GetChannel()) ||
// mixLabelExcludes.Contain(t.GetLabel()) ||
// clusterLabelExcludes.Contain(t.GetLabel()){
// continue
// }
// picked = append(picked, t)
// slot -= 1
// clusterChannelExcludes.Insert(t.GetChannel())
// clusterLabelExcludes.Insert(t.GetLabel())
case datapb.CompactionType_ClusteringCompaction:
if l0ChannelExcludes.Contain(t.GetChannel()) ||
mixLabelExcludes.Contain(t.GetLabel()) ||
clusterLabelExcludes.Contain(t.GetLabel()) {
continue
}
picked = append(picked, t)
clusterChannelExcludes.Insert(t.GetChannel())
clusterLabelExcludes.Insert(t.GetLabel())
}
}
return picked
}

func (c *compactionPlanHandler) start() {
c.loadMeta()
c.stopWg.Add(3)
go c.loopSchedule()
go c.loopCheck()
go c.loopClean()
}

func (c *compactionPlanHandler) loadMeta() {
// todo: make it compatible to all types of compaction with persist meta
triggers := c.meta.(*meta).compactionTaskMeta.GetCompactionTasks()
for _, tasks := range triggers {
for _, task := range tasks {
if task.State != datapb.CompactionTaskState_completed && task.State != datapb.CompactionTaskState_cleaned {
c.enqueueCompaction(task)
}
}
}
}

func (c *compactionPlanHandler) doSchedule() {
picked := c.schedule()
if len(picked) > 0 {
Expand Down Expand Up @@ -311,6 +360,7 @@ func (c *compactionPlanHandler) loopClean() {

func (c *compactionPlanHandler) Clean() {
c.cleanCompactionTaskMeta()
c.cleanPartitionStats()
}

func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
Expand All @@ -332,6 +382,56 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
}
}

func (c *compactionPlanHandler) cleanPartitionStats() error {
log.Debug("start gc partitionStats meta and files")
// gc partition stats
channelPartitionStatsInfos := make(map[string][]*datapb.PartitionStatsInfo)
unusedPartStats := make([]*datapb.PartitionStatsInfo, 0)
if c.meta.GetPartitionStatsMeta() == nil {
return nil
}
infos := c.meta.GetPartitionStatsMeta().ListAllPartitionStatsInfos()
for _, info := range infos {
collInfo := c.meta.(*meta).GetCollection(info.GetCollectionID())
if collInfo == nil {
unusedPartStats = append(unusedPartStats, info)
continue
}
channel := fmt.Sprintf("%d/%d/%s", info.CollectionID, info.PartitionID, info.VChannel)
if _, ok := channelPartitionStatsInfos[channel]; !ok {
channelPartitionStatsInfos[channel] = make([]*datapb.PartitionStatsInfo, 0)
}
channelPartitionStatsInfos[channel] = append(channelPartitionStatsInfos[channel], info)
}
log.Debug("channels with PartitionStats meta", zap.Int("len", len(channelPartitionStatsInfos)))

for _, info := range unusedPartStats {
log.Debug("collection has been dropped, remove partition stats",
zap.Int64("collID", info.GetCollectionID()))
if err := c.meta.CleanPartitionStatsInfo(info); err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return err
}
}

for channel, infos := range channelPartitionStatsInfos {
sort.Slice(infos, func(i, j int) bool {
return infos[i].Version > infos[j].Version
})
log.Debug("PartitionStats in channel", zap.String("channel", channel), zap.Int("len", len(infos)))
if len(infos) > 2 {
for i := 2; i < len(infos); i++ {
info := infos[i]
if err := c.meta.CleanPartitionStatsInfo(info); err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return err
}
}
}
}
return nil
}

func (c *compactionPlanHandler) stop() {
c.stopOnce.Do(func() {
close(c.stopCh)
Expand Down Expand Up @@ -407,7 +507,7 @@ func (c *compactionPlanHandler) getCompactionTask(planID int64) CompactionTask {
}

func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) error {
log := log.With(zap.Int64("planID", task.GetPlanID()), zap.Int64("collectionID", task.GetCollectionID()), zap.String("type", task.GetType().String()))
log := log.With(zap.Int64("planID", task.GetPlanID()), zap.Int64("triggerID", task.GetTriggerID()), zap.Int64("collectionID", task.GetCollectionID()), zap.String("type", task.GetType().String()))
if c.isFull() {
return errCompactionBusy
}
Expand All @@ -425,11 +525,14 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
if t == nil {
return merr.WrapErrIllegalCompactionPlan("illegal compaction type")
}
t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
err := t.SaveTaskMeta()
if err != nil {
return err
if task.StartTime != 0 {
t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
err := t.SaveTaskMeta()
if err != nil {
return err
}
}

_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", task.GetType()))
t.SetSpan(span)

Expand All @@ -454,6 +557,15 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) Comp
meta: c.meta,
sessions: c.sessions,
}
case datapb.CompactionType_ClusteringCompaction:
task = &clusteringCompactionTask{
CompactionTask: t,
meta: c.meta,
sessions: c.sessions,
handler: c.handler,
analyzeScheduler: c.analyzeScheduler,
allocator: c.allocator,
}
}
return task
}
Expand Down
13 changes: 13 additions & 0 deletions internal/datacoord/compaction_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package datacoord

import (
"context"
"time"
)

type CompactionPolicy interface {
Enable() bool
Ticker() *time.Ticker
Stop()
Trigger(ctx context.Context) (map[CompactionTriggerType][]CompactionView, error)
}
Loading

0 comments on commit 0aff609

Please sign in to comment.