From fa62941283e314f7e593710436e35011982fcf88 Mon Sep 17 00:00:00 2001 From: wayblink Date: Mon, 20 May 2024 17:37:02 +0800 Subject: [PATCH] fix comment --- internal/datanode/clustering_compactor.go | 58 ++++++++++++----------- pkg/util/paramtable/component_param.go | 13 ++++- 2 files changed, 43 insertions(+), 28 deletions(-) diff --git a/internal/datanode/clustering_compactor.go b/internal/datanode/clustering_compactor.go index fd85a63cd6be2..49e297d6fc9c5 100644 --- a/internal/datanode/clustering_compactor.go +++ b/internal/datanode/clustering_compactor.go @@ -19,6 +19,7 @@ package datanode import ( "context" "fmt" + "math" "path" "sort" "strconv" @@ -143,7 +144,6 @@ func newClusteringCompactionTask( done: make(chan struct{}, 1), totalBufferSize: atomic.Int64{}, spillChan: make(chan SpillSignal, 100), - pool: conc.NewPool[any](hardware.GetCPUNum() * 2), clusterBuffers: make([]*ClusterBuffer, 0), clusterBufferLocks: lock.NewKeyLock[int](), } @@ -201,7 +201,9 @@ func (t *clusteringCompactionTask) init() error { t.isVectorClusteringKey = typeutil.IsVectorType(t.clusteringKeyField.DataType) t.currentTs = tsoutil.GetCurrentTime() t.memoryBufferSize = t.getMemoryBufferSize() - log.Info("clustering compaction memory buffer", zap.Int64("size", t.memoryBufferSize)) + workerPoolSize := t.getWorkerPoolSize() + t.pool = conc.NewPool[any](workerPoolSize) + log.Info("clustering compaction task initialed", zap.Int64("memory_buffer_size", t.memoryBufferSize), zap.Int("worker_pool_size", workerPoolSize)) return nil } @@ -579,19 +581,21 @@ func (t *clusteringCompactionTask) mappingSegment( } remained++ + currentSize := t.totalBufferSize.Load() // trigger spill if int64(clusterBuffer.buffer.GetRowNum()) > t.plan.GetMaxSegmentRows() || clusterBuffer.buffer.GetMemorySize() > Params.DataNodeCfg.BinLogMaxSize.GetAsInt() { + // reach segment/binlog max size t.spillChan <- SpillSignal{ buffer: clusterBuffer, } - } else if t.totalBufferSize.Load() >= t.memoryBufferSize { + } else if currentSize >= t.getMemoryBufferMiddleWatermark() { + // reach spill trigger threshold t.spillChan <- SpillSignal{} } - // block here, wait for memory release by spill - currentSize := t.totalBufferSize.Load() - if currentSize > t.getSpillMemorySizeThreshold() { - t.spillChan <- SpillSignal{} + + // if the total buffer size is too large, block here, wait for memory release by spill + if currentSize > t.getMemoryBufferHighWatermark() { loop: for { select { @@ -603,10 +607,10 @@ func (t *clusteringCompactionTask) mappingSegment( return nil default: currentSize := t.totalBufferSize.Load() - if currentSize < t.getSpillMemorySizeThreshold() { + if currentSize < t.getMemoryBufferMiddleWatermark() { break loop } - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Millisecond * 200) } } } @@ -676,13 +680,21 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf return nil } +func (t *clusteringCompactionTask) getWorkerPoolSize() int { + return int(math.Max(float64(Params.DataNodeCfg.ClusteringCompactionWorkerPoolSize.GetAsInt()), 1.0)) +} + // getMemoryBufferSize return memoryBufferSize func (t *clusteringCompactionTask) getMemoryBufferSize() int64 { return int64(float64(hardware.GetMemoryCount()) * Params.DataNodeCfg.ClusteringCompactionMemoryBufferRatio.GetAsFloat()) } -func (t *clusteringCompactionTask) getSpillMemorySizeThreshold() int64 { - return int64(float64(t.memoryBufferSize) * 0.8) +func (t *clusteringCompactionTask) getMemoryBufferMiddleWatermark() int64 { + return int64(float64(t.memoryBufferSize) * 0.5) +} + +func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 { + return int64(float64(t.memoryBufferSize) * 0.9) } func (t *clusteringCompactionTask) backgroundSpill(ctx context.Context) { @@ -720,18 +732,16 @@ func (t *clusteringCompactionTask) spillLargestBuffers(ctx context.Context) erro bufferIDs := make([]int, 0) for _, buffer := range t.clusterBuffers { bufferIDs = append(bufferIDs, buffer.id) - t.clusterBufferLocks.Lock(buffer.id) } - defer func() { - for _, buffer := range t.clusterBuffers { - t.clusterBufferLocks.Unlock(buffer.id) - } - }() sort.Slice(bufferIDs, func(i, j int) bool { return t.clusterBuffers[i].buffer.GetMemorySize() > t.clusterBuffers[j].buffer.GetMemorySize() }) - for index, id := range bufferIDs { - err := t.spill(ctx, t.clusterBuffers[id]) + for index, bufferId := range bufferIDs { + err := func() error { + t.clusterBufferLocks.Lock(bufferId) + defer t.clusterBufferLocks.Unlock(bufferId) + return t.spill(ctx, t.clusterBuffers[bufferId]) + }() if err != nil { return err } @@ -746,16 +756,10 @@ func (t *clusteringCompactionTask) spillAll(ctx context.Context) error { // only one spillLargestBuffers or spillAll should do at the same time t.spillMutex.Lock() defer t.spillMutex.Unlock() - for _, buffer := range t.clusterBuffers { - t.clusterBufferLocks.Lock(buffer.id) - } - defer func() { - for _, buffer := range t.clusterBuffers { - t.clusterBufferLocks.Unlock(buffer.id) - } - }() for _, buffer := range t.clusterBuffers { err := func() error { + t.clusterBufferLocks.Lock(buffer.id) + defer t.clusterBufferLocks.Unlock(buffer.id) err := t.spill(ctx, buffer) if err != nil { log.Error("spill fail") diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 3c5d874d33007..d9dc2d47fb1a6 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3580,6 +3580,7 @@ type dataNodeConfig struct { // clustering compaction ClusteringCompactionMemoryBufferRatio ParamItem `refreshable:"true"` + ClusteringCompactionWorkerPoolSize ParamItem `refreshable:"true"` } func (p *dataNodeConfig) init(base *BaseTable) { @@ -3897,13 +3898,23 @@ if this parameter <= 0, will set it as 10`, p.ClusteringCompactionMemoryBufferRatio = ParamItem{ Key: "datanode.clusteringCompaction.memoryBufferRatio", - Version: "2.4.0", + Version: "2.4.2", Doc: "The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage.", DefaultValue: "0.1", PanicIfEmpty: false, Export: true, } p.ClusteringCompactionMemoryBufferRatio.Init(base.mgr) + + p.ClusteringCompactionWorkerPoolSize = ParamItem{ + Key: "datanode.clusteringCompaction.cpu", + Version: "2.4.2", + Doc: "worker pool size for one clustering compaction job.", + DefaultValue: "1", + PanicIfEmpty: false, + Export: true, + } + p.ClusteringCompactionWorkerPoolSize.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////