Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed May 22, 2024
1 parent 093469c commit fa62941
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 28 deletions.
58 changes: 31 additions & 27 deletions internal/datanode/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datanode
import (
"context"
"fmt"
"math"
"path"
"sort"
"strconv"
Expand Down Expand Up @@ -143,7 +144,6 @@ func newClusteringCompactionTask(
done: make(chan struct{}, 1),
totalBufferSize: atomic.Int64{},
spillChan: make(chan SpillSignal, 100),
pool: conc.NewPool[any](hardware.GetCPUNum() * 2),
clusterBuffers: make([]*ClusterBuffer, 0),
clusterBufferLocks: lock.NewKeyLock[int](),
}
Expand Down Expand Up @@ -201,7 +201,9 @@ func (t *clusteringCompactionTask) init() error {
t.isVectorClusteringKey = typeutil.IsVectorType(t.clusteringKeyField.DataType)
t.currentTs = tsoutil.GetCurrentTime()
t.memoryBufferSize = t.getMemoryBufferSize()
log.Info("clustering compaction memory buffer", zap.Int64("size", t.memoryBufferSize))
workerPoolSize := t.getWorkerPoolSize()
t.pool = conc.NewPool[any](workerPoolSize)
log.Info("clustering compaction task initialed", zap.Int64("memory_buffer_size", t.memoryBufferSize), zap.Int("worker_pool_size", workerPoolSize))

return nil
}
Expand Down Expand Up @@ -579,19 +581,21 @@ func (t *clusteringCompactionTask) mappingSegment(
}
remained++

currentSize := t.totalBufferSize.Load()
// trigger spill
if int64(clusterBuffer.buffer.GetRowNum()) > t.plan.GetMaxSegmentRows() ||
clusterBuffer.buffer.GetMemorySize() > Params.DataNodeCfg.BinLogMaxSize.GetAsInt() {
// reach segment/binlog max size
t.spillChan <- SpillSignal{
buffer: clusterBuffer,
}
} else if t.totalBufferSize.Load() >= t.memoryBufferSize {
} else if currentSize >= t.getMemoryBufferMiddleWatermark() {
// reach spill trigger threshold
t.spillChan <- SpillSignal{}
}
// block here, wait for memory release by spill
currentSize := t.totalBufferSize.Load()
if currentSize > t.getSpillMemorySizeThreshold() {
t.spillChan <- SpillSignal{}

// if the total buffer size is too large, block here, wait for memory release by spill
if currentSize > t.getMemoryBufferHighWatermark() {
loop:
for {
select {
Expand All @@ -603,10 +607,10 @@ func (t *clusteringCompactionTask) mappingSegment(
return nil
default:
currentSize := t.totalBufferSize.Load()
if currentSize < t.getSpillMemorySizeThreshold() {
if currentSize < t.getMemoryBufferMiddleWatermark() {
break loop
}
time.Sleep(time.Millisecond * 50)
time.Sleep(time.Millisecond * 200)
}
}
}
Expand Down Expand Up @@ -676,13 +680,21 @@ func (t *clusteringCompactionTask) writeToBuffer(ctx context.Context, clusterBuf
return nil
}

func (t *clusteringCompactionTask) getWorkerPoolSize() int {
return int(math.Max(float64(Params.DataNodeCfg.ClusteringCompactionWorkerPoolSize.GetAsInt()), 1.0))
}

// getMemoryBufferSize return memoryBufferSize
func (t *clusteringCompactionTask) getMemoryBufferSize() int64 {
return int64(float64(hardware.GetMemoryCount()) * Params.DataNodeCfg.ClusteringCompactionMemoryBufferRatio.GetAsFloat())
}

func (t *clusteringCompactionTask) getSpillMemorySizeThreshold() int64 {
return int64(float64(t.memoryBufferSize) * 0.8)
func (t *clusteringCompactionTask) getMemoryBufferMiddleWatermark() int64 {
return int64(float64(t.memoryBufferSize) * 0.5)
}

func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 {
return int64(float64(t.memoryBufferSize) * 0.9)
}

func (t *clusteringCompactionTask) backgroundSpill(ctx context.Context) {
Expand Down Expand Up @@ -720,18 +732,16 @@ func (t *clusteringCompactionTask) spillLargestBuffers(ctx context.Context) erro
bufferIDs := make([]int, 0)
for _, buffer := range t.clusterBuffers {
bufferIDs = append(bufferIDs, buffer.id)
t.clusterBufferLocks.Lock(buffer.id)
}
defer func() {
for _, buffer := range t.clusterBuffers {
t.clusterBufferLocks.Unlock(buffer.id)
}
}()
sort.Slice(bufferIDs, func(i, j int) bool {
return t.clusterBuffers[i].buffer.GetMemorySize() > t.clusterBuffers[j].buffer.GetMemorySize()
})
for index, id := range bufferIDs {
err := t.spill(ctx, t.clusterBuffers[id])
for index, bufferId := range bufferIDs {
err := func() error {
t.clusterBufferLocks.Lock(bufferId)
defer t.clusterBufferLocks.Unlock(bufferId)
return t.spill(ctx, t.clusterBuffers[bufferId])
}()
if err != nil {
return err
}
Expand All @@ -746,16 +756,10 @@ func (t *clusteringCompactionTask) spillAll(ctx context.Context) error {
// only one spillLargestBuffers or spillAll should do at the same time
t.spillMutex.Lock()
defer t.spillMutex.Unlock()
for _, buffer := range t.clusterBuffers {
t.clusterBufferLocks.Lock(buffer.id)
}
defer func() {
for _, buffer := range t.clusterBuffers {
t.clusterBufferLocks.Unlock(buffer.id)
}
}()
for _, buffer := range t.clusterBuffers {
err := func() error {
t.clusterBufferLocks.Lock(buffer.id)
defer t.clusterBufferLocks.Unlock(buffer.id)
err := t.spill(ctx, buffer)
if err != nil {
log.Error("spill fail")
Expand Down
13 changes: 12 additions & 1 deletion pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3580,6 +3580,7 @@ type dataNodeConfig struct {

// clustering compaction
ClusteringCompactionMemoryBufferRatio ParamItem `refreshable:"true"`
ClusteringCompactionWorkerPoolSize ParamItem `refreshable:"true"`
}

func (p *dataNodeConfig) init(base *BaseTable) {
Expand Down Expand Up @@ -3897,13 +3898,23 @@ if this parameter <= 0, will set it as 10`,

p.ClusteringCompactionMemoryBufferRatio = ParamItem{
Key: "datanode.clusteringCompaction.memoryBufferRatio",
Version: "2.4.0",
Version: "2.4.2",
Doc: "The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage.",
DefaultValue: "0.1",
PanicIfEmpty: false,
Export: true,
}
p.ClusteringCompactionMemoryBufferRatio.Init(base.mgr)

p.ClusteringCompactionWorkerPoolSize = ParamItem{
Key: "datanode.clusteringCompaction.cpu",
Version: "2.4.2",
Doc: "worker pool size for one clustering compaction job.",
DefaultValue: "1",
PanicIfEmpty: false,
Export: true,
}
p.ClusteringCompactionWorkerPoolSize.Init(base.mgr)
}

// /////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit fa62941

Please sign in to comment.