Skip to content

Commit

Permalink
add count
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed May 28, 2024
1 parent c851363 commit 6fa13a9
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand Down Expand Up @@ -75,10 +75,11 @@ type clusteringCompactionTask struct {
plan *datapb.CompactionPlan

// schedule
totalBufferSize atomic.Int64
totalBufferSize *atomic.Int64
spillChan chan SpillSignal
spillCount atomic.Int64
spillCount *atomic.Int64
pool *conc.Pool[any]
writeRowNum *atomic.Int64

// inner field
collectionID int64
Expand Down Expand Up @@ -136,10 +137,12 @@ func NewClusteringCompactionTask(
plan: plan,
tr: timerecord.NewTimeRecorder("clustering_compaction"),
done: make(chan struct{}, 1),
totalBufferSize: atomic.Int64{},
totalBufferSize: atomic.NewInt64(0),
spillCount: atomic.NewInt64(0),
spillChan: make(chan SpillSignal, 100),
clusterBuffers: make([]*ClusterBuffer, 0),
clusterBufferLocks: lock.NewKeyLock[int](),
writeRowNum: atomic.NewInt64(0),
}
}

Expand Down Expand Up @@ -589,6 +592,7 @@ func (t *clusteringCompactionTask) mappingSegment(
zap.Int64("remained_entities", remained),
zap.Int64("deleted_entities", deleted),
zap.Int64("expired_entities", expired),
zap.Int64("writeRowNum", t.writeRowNum.Load()),
zap.Duration("elapse", time.Since(processStart)))
return nil
}
Expand All @@ -607,6 +611,7 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf
if err != nil {
return err
}
t.writeRowNum.Inc()
clusterBuffer.bufferSize.Add(int64(rowSize))
clusterBuffer.bufferRowNum.Add(1)
t.totalBufferSize.Add(int64(rowSize))
Expand Down Expand Up @@ -783,7 +788,8 @@ func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuf
buffer.bufferSize.Store(0)
buffer.bufferRowNum.Store(0)

log.Info("finish spill binlogs")
t.spillCount.Inc()
log.Info("finish spill binlogs", zap.Int64("spillCount", t.spillCount.Load()))
if buffer.flushedRowNum > t.plan.GetMaxSegmentRows() {
if err := t.packBufferToSegment(ctx, buffer); err != nil {
return err
Expand Down

0 comments on commit 6fa13a9

Please sign in to comment.