From 6fa13a9a32ce76ef363ade1d5eb2a68310ec9441 Mon Sep 17 00:00:00 2001 From: wayblink Date: Tue, 28 May 2024 20:10:34 +0800 Subject: [PATCH] add count --- .../datanode/compaction/clustering_compactor.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index b66e76c59cfee..921334677404f 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -24,13 +24,13 @@ import ( "sort" "strconv" "sync" - "sync/atomic" "time" "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "github.com/samber/lo" "go.opentelemetry.io/otel" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -75,10 +75,11 @@ type clusteringCompactionTask struct { plan *datapb.CompactionPlan // schedule - totalBufferSize atomic.Int64 + totalBufferSize *atomic.Int64 spillChan chan SpillSignal - spillCount atomic.Int64 + spillCount *atomic.Int64 pool *conc.Pool[any] + writeRowNum *atomic.Int64 // inner field collectionID int64 @@ -136,10 +137,12 @@ func NewClusteringCompactionTask( plan: plan, tr: timerecord.NewTimeRecorder("clustering_compaction"), done: make(chan struct{}, 1), - totalBufferSize: atomic.Int64{}, + totalBufferSize: atomic.NewInt64(0), + spillCount: atomic.NewInt64(0), spillChan: make(chan SpillSignal, 100), clusterBuffers: make([]*ClusterBuffer, 0), clusterBufferLocks: lock.NewKeyLock[int](), + writeRowNum: atomic.NewInt64(0), } } @@ -589,6 +592,7 @@ func (t *clusteringCompactionTask) mappingSegment( zap.Int64("remained_entities", remained), zap.Int64("deleted_entities", deleted), zap.Int64("expired_entities", expired), + zap.Int64("writeRowNum", t.writeRowNum.Load()), zap.Duration("elapse", time.Since(processStart))) return nil } @@ -607,6 +611,7 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf if err != nil { return err } + t.writeRowNum.Inc() clusterBuffer.bufferSize.Add(int64(rowSize)) clusterBuffer.bufferRowNum.Add(1) t.totalBufferSize.Add(int64(rowSize)) @@ -783,7 +788,8 @@ func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuf buffer.bufferSize.Store(0) buffer.bufferRowNum.Store(0) - log.Info("finish spill binlogs") + t.spillCount.Inc() + log.Info("finish spill binlogs", zap.Int64("spillCount", t.spillCount.Load())) if buffer.flushedRowNum > t.plan.GetMaxSegmentRows() { if err := t.packBufferToSegment(ctx, buffer); err != nil { return err