Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Optimize the task scheduling logic in DataCoord #39084

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,9 @@ dataCoord:
clusteringCompactionUsage: 16 # slot usage of clustering compaction job.
mixCompactionUsage: 8 # slot usage of mix compaction job.
l0DeleteCompactionUsage: 8 # slot usage of l0 compaction job.
indexTaskSlotUsage: 16 # slot usage of index task
statsTaskSlotUsage: 4 # slot usage of stats task
analyzeTaskSlotUsage: 16 # slot usage of analyze task
ip: # TCP/IP address of dataCoord. If not specified, use the first unicastable address
port: 13333 # TCP port of dataCoord
grpc:
Expand Down
47 changes: 47 additions & 0 deletions internal/datacoord/session/indexnode_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@
RemoveNode(nodeID typeutil.UniqueID)
StoppingNode(nodeID typeutil.UniqueID)
PickClient() (typeutil.UniqueID, types.IndexNodeClient)
QuerySlots() []WorkerSlots
ClientSupportDisk() bool
GetAllClients() map[typeutil.UniqueID]types.IndexNodeClient
GetClientByID(nodeID typeutil.UniqueID) (types.IndexNodeClient, bool)
}

type WorkerSlots struct {
NodeID int64
TotalSlots int64
AvailableSlots int64
}

// IndexNodeManager is used to manage the client of IndexNode.
type IndexNodeManager struct {
nodeClients map[typeutil.UniqueID]types.IndexNodeClient
Expand Down Expand Up @@ -115,6 +122,46 @@
return nil
}

func (nm *IndexNodeManager) QuerySlots() []WorkerSlots {
nm.lock.Lock()
defer nm.lock.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), querySlotTimeout)
defer cancel()

nodeSlots := make([]WorkerSlots, 0)
mu := &sync.Mutex{}
wg := &sync.WaitGroup{}
for nodeID, client := range nm.nodeClients {
if _, ok := nm.stoppingNodes[nodeID]; !ok {
wg.Add(1)
go func(nodeID int64) {
defer wg.Done()
resp, err := client.GetJobStats(ctx, &workerpb.GetJobStatsRequest{})
if err != nil {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.Error(err))
return
}
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID),
zap.String("reason", resp.GetStatus().GetReason()))
return
}

Check warning on line 149 in internal/datacoord/session/indexnode_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/session/indexnode_manager.go#L146-L149

Added lines #L146 - L149 were not covered by tests
mu.Lock()
defer mu.Unlock()
nodeSlots = append(nodeSlots, WorkerSlots{
NodeID: nodeID,
TotalSlots: resp.GetTotalSlots(),
AvailableSlots: resp.GetTaskSlots(),
})
}(nodeID)
}
}
wg.Wait()
log.Ctx(context.TODO()).Debug("query slot done", zap.Any("nodeSlots", nodeSlots))
return nodeSlots
}

func (nm *IndexNodeManager) PickClient() (typeutil.UniqueID, types.IndexNodeClient) {
nm.lock.Lock()
defer nm.lock.Unlock()
Expand Down
47 changes: 47 additions & 0 deletions internal/datacoord/session/mock_worker_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions internal/datacoord/task_analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@
return nil
}

func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool {
func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler) (bool, int64) {
t := dependency.meta.analyzeMeta.GetTask(at.GetTaskID())
if t == nil {
log.Ctx(ctx).Info("task is nil, delete it", zap.Int64("taskID", at.GetTaskID()))
at.SetState(indexpb.JobState_JobStateNone, "analyze task is nil")
return false
return false, 0

Check warning on line 141 in internal/datacoord/task_analyze.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/task_analyze.go#L141

Added line #L141 was not covered by tests
}

at.req = &workerpb.AnalyzeRequest{
Expand Down Expand Up @@ -170,7 +170,7 @@
log.Ctx(ctx).Warn("analyze stats task is processing, but segment is nil, delete the task",
zap.Int64("taskID", at.GetTaskID()), zap.Int64("segmentID", segID))
at.SetState(indexpb.JobState_JobStateFailed, fmt.Sprintf("segmentInfo with ID: %d is nil", segID))
return false
return false, 0
}

totalSegmentsRows += info.GetNumOfRows()
Expand All @@ -188,7 +188,7 @@
log.Ctx(ctx).Warn("analyze task get collection info failed", zap.Int64("collectionID",
segments[0].GetCollectionID()), zap.Error(err))
at.SetState(indexpb.JobState_JobStateInit, err.Error())
return false
return false, 0

Check warning on line 191 in internal/datacoord/task_analyze.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/task_analyze.go#L191

Added line #L191 was not covered by tests
}

schema := collInfo.Schema
Expand All @@ -203,7 +203,7 @@
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
at.SetState(indexpb.JobState_JobStateInit, err.Error())
return false
return false, 0

Check warning on line 206 in internal/datacoord/task_analyze.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/task_analyze.go#L206

Added line #L206 was not covered by tests
}
at.req.Dim = int64(dim)

Expand All @@ -212,7 +212,7 @@
if numClusters < Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64() {
log.Ctx(ctx).Info("data size is too small, skip analyze task", zap.Float64("raw data size", totalSegmentsRawDataSize), zap.Int64("num clusters", numClusters), zap.Int64("minimum num clusters required", Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64()))
at.SetState(indexpb.JobState_JobStateFinished, "")
return false
return false, 0

Check warning on line 215 in internal/datacoord/task_analyze.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/task_analyze.go#L215

Added line #L215 was not covered by tests
}
if numClusters > Params.DataCoordCfg.ClusteringCompactionMaxCentroidsNum.GetAsInt64() {
numClusters = Params.DataCoordCfg.ClusteringCompactionMaxCentroidsNum.GetAsInt64()
Expand All @@ -223,8 +223,10 @@
at.req.MinClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMinClusterSizeRatio.GetAsFloat()
at.req.MaxClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMaxClusterSizeRatio.GetAsFloat()
at.req.MaxClusterSize = Params.DataCoordCfg.ClusteringCompactionMaxClusterSize.GetAsSize()
taskSlot := Params.DataCoordCfg.AnalyzeTaskSlotUsage.GetAsInt64()
at.req.TaskSlot = taskSlot

return true
return true, taskSlot
}

func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
Expand Down
32 changes: 23 additions & 9 deletions internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,19 @@
return meta.indexMeta.BuildIndex(it.taskID)
}

func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool {
func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskScheduler) (bool, int64) {
segIndex, exist := dependency.meta.indexMeta.GetIndexJob(it.taskID)
if !exist || segIndex == nil {
log.Ctx(ctx).Info("index task has not exist in meta table, remove task", zap.Int64("taskID", it.taskID))
it.SetState(indexpb.JobState_JobStateNone, "index task has not exist in meta table")
return false
return false, 0

Check warning on line 140 in internal/datacoord/task_index.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/task_index.go#L140

Added line #L140 was not covered by tests
}

segment := dependency.meta.GetSegment(ctx, segIndex.SegmentID)
if !isSegmentHealthy(segment) || !dependency.meta.indexMeta.IsIndexExist(segIndex.CollectionID, segIndex.IndexID) {
log.Ctx(ctx).Info("task is no need to build index, remove it", zap.Int64("taskID", it.taskID))
it.SetState(indexpb.JobState_JobStateNone, "task is no need to build index")
return false
return false, 0
}
indexParams := dependency.meta.indexMeta.GetIndexParams(segIndex.CollectionID, segIndex.IndexID)
indexType := GetIndexType(indexParams)
Expand All @@ -154,7 +154,7 @@
it.SetStartTime(time.Now())
it.SetEndTime(time.Now())
it.SetState(indexpb.JobState_JobStateFinished, "fake finished index success")
return false
return false, 0
}

typeParams := dependency.meta.indexMeta.GetTypeParams(segIndex.CollectionID, segIndex.IndexID)
Expand All @@ -170,7 +170,7 @@
if ret != nil {
log.Ctx(ctx).Warn("failed to update index build params defined in yaml", zap.Int64("taskID", it.taskID), zap.Error(ret))
it.SetState(indexpb.JobState_JobStateInit, ret.Error())
return false
return false, 0

Check warning on line 173 in internal/datacoord/task_index.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/task_index.go#L173

Added line #L173 was not covered by tests
}
}

Expand All @@ -180,14 +180,14 @@
if err != nil {
log.Ctx(ctx).Warn("failed to append index build params", zap.Int64("taskID", it.taskID), zap.Error(err))
it.SetState(indexpb.JobState_JobStateInit, err.Error())
return false
return false, 0

Check warning on line 183 in internal/datacoord/task_index.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/task_index.go#L183

Added line #L183 was not covered by tests
}
}

collectionInfo, err := dependency.handler.GetCollection(ctx, segment.GetCollectionID())
if err != nil {
log.Ctx(ctx).Info("index builder get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err))
return false
return false, 0
}

schema := collectionInfo.Schema
Expand Down Expand Up @@ -215,7 +215,7 @@
if collectionInfo == nil {
log.Ctx(ctx).Warn("get collection failed", zap.Int64("collID", segIndex.CollectionID), zap.Error(err))
it.SetState(indexpb.JobState_JobStateInit, err.Error())
return true
return true, 0

Check warning on line 218 in internal/datacoord/task_index.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/task_index.go#L218

Added line #L218 was not covered by tests
}
partitionKeyField, _ := typeutil.GetPartitionKeyFieldSchema(schema)
if partitionKeyField != nil && typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyField) {
Expand All @@ -235,6 +235,8 @@
}
}

taskSlot := calculateIndexTaskSlot(segment.getSegmentSize())

it.req = &workerpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath),
Expand All @@ -257,11 +259,23 @@
OptionalScalarFields: optionalFields,
Field: field,
PartitionKeyIsolation: partitionKeyIsolation,
TaskSlot: taskSlot,
}

log.Ctx(ctx).Info("index task pre check successfully", zap.Int64("taskID", it.GetTaskID()),
zap.Int64("segID", segment.GetID()))
return true
return true, taskSlot
}

func calculateIndexTaskSlot(segmentSize int64) int64 {
if segmentSize > 500*1024*1024 {
return max(Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64(), 1)

Check warning on line 272 in internal/datacoord/task_index.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/task_index.go#L272

Added line #L272 was not covered by tests
} else if segmentSize > 100*1024*1024 {
return max(Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64()/2, 1)

Check warning on line 274 in internal/datacoord/task_index.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/task_index.go#L274

Added line #L274 was not covered by tests
} else if segmentSize > 10*1024*1024 {
return max(Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64()/4, 1)
}
return max(Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64()/8, 1)
}

func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
Expand Down
Loading
Loading