diff --git a/configs/backup.yaml b/configs/backup.yaml index 3c52cefd..679e9336 100644 --- a/configs/backup.yaml +++ b/configs/backup.yaml @@ -38,4 +38,16 @@ minio: backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath backup: - maxSegmentGroupSize: 2G \ No newline at end of file + maxSegmentGroupSize: 2G + parallelism: 2 # collection level parallelism to backup, default 1 + copydata: + # thread pool to copy data for each collection backup, default 100. + # which means if you set backup.parallelism = 2 backup.copydata.parallelism = 100, there will be 200 copy executing at the same time. + # reduce it if blocks your storage's network bandwidth + parallelism: 100 + +restore: + # Collection level parallelism to restore, default 1 + # Only change it > 1 when you have more than one datanode. + # Because the max parallelism of Milvus bulkinsert is equal to datanodes' number. + parallelism: 2 \ No newline at end of file diff --git a/core/backup_context.go b/core/backup_context.go index ebe18958..c40054a0 100644 --- a/core/backup_context.go +++ b/core/backup_context.go @@ -36,7 +36,7 @@ type BackupContext struct { params paramtable.BackupParams // milvus client - milvusClient *gomilvus.Client + milvusClient *MilvusClient // data storage client storageClient *storage.ChunkManager @@ -50,7 +50,7 @@ type BackupContext struct { restoreTasks map[string]*backuppb.RestoreBackupTask - //copyWorkerPool *common.WorkerPool + //backupWorkerPool *common.WorkerPool } func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (gomilvus.Client, error) { @@ -88,35 +88,9 @@ func CreateStorageClient(ctx context.Context, params paramtable.BackupParams) (s } func (b *BackupContext) Start() error { - // start milvus go SDK client - //milvusClient, err := CreateMilvusClient(b.ctx, b.params) - //if err != nil { - // log.Error("failed to initial milvus client", zap.Error(err)) - // return err - //} - //b.milvusClient = milvusClient - - // start milvus storage client - //minioClient, err := CreateStorageClient(b.ctx, b.params) - //if err != nil { - // log.Error("failed to initial storage client", zap.Error(err)) - // return err - //} - //b.storageClient = minioClient - b.backupTasks = sync.Map{} b.backupNameIdDict = sync.Map{} b.restoreTasks = make(map[string]*backuppb.RestoreBackupTask) - - // init worker pool - //wp, err := common.NewWorkerPool(b.ctx, WORKER_NUM, RPS) - //if err != nil { - // log.Error("failed to initial copy data woker pool", zap.Error(err)) - // return err - //} - //b.copyWorkerPool = wp - //b.copyWorkerPool.Start() - b.started = true return nil } @@ -141,16 +115,18 @@ func CreateBackupContext(ctx context.Context, params paramtable.BackupParams) *B } } -func (b *BackupContext) getMilvusClient() gomilvus.Client { +func (b *BackupContext) getMilvusClient() *MilvusClient { if b.milvusClient == nil { milvusClient, err := CreateMilvusClient(b.ctx, b.params) if err != nil { log.Error("failed to initial milvus client", zap.Error(err)) panic(err) } - b.milvusClient = &milvusClient + b.milvusClient = &MilvusClient{ + client: milvusClient, + } } - return *b.milvusClient + return b.milvusClient } func (b *BackupContext) getStorageClient() storage.ChunkManager { @@ -165,6 +141,19 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager { return *b.storageClient } +//func (b *BackupContext) getBackupWorkerPool() *common.WorkerPool { +// if b.backupWorkerPool == nil { +// wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupParallelism, RPS) +// if err != nil { +// log.Error("failed to initial copy data woker pool", zap.Error(err)) +// panic(err) +// } +// b.backupWorkerPool = wp +// b.backupWorkerPool.Start() +// } +// return b.backupWorkerPool +//} + func (b *BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBackupRequest) *backuppb.BackupInfoResponse { if request.GetRequestId() == "" { request.RequestId = utils.UUID() diff --git a/core/backup_impl_create_backup.go b/core/backup_impl_create_backup.go index 4e076359..82ed1249 100644 --- a/core/backup_impl_create_backup.go +++ b/core/backup_impl_create_backup.go @@ -92,7 +92,7 @@ func (b *BackupContext) CreateBackup(ctx context.Context, request *backuppb.Crea b.backupNameIdDict.Store(name, request.GetRequestId()) if request.Async { - go b.executeCreateBackup(ctx, request, backup) + go b.executeCreateBackupV2(ctx, request, backup) asyncResp := &backuppb.BackupInfoResponse{ RequestId: request.GetRequestId(), Code: backuppb.ResponseCode_Success, @@ -101,7 +101,7 @@ func (b *BackupContext) CreateBackup(ctx context.Context, request *backuppb.Crea } return asyncResp } else { - task, err := b.executeCreateBackup(ctx, request, backup) + task, err := b.executeCreateBackupV2(ctx, request, backup) resp.Data = task if err != nil { resp.Code = backuppb.ResponseCode_Fail @@ -125,7 +125,12 @@ func (b *BackupContext) refreshBackupMeta(id string, backupInfo *backuppb.Backup return backup, nil } -type collection struct { +func (b *BackupContext) refreshBackupCache(backupInfo *backuppb.BackupInfo) { + log.Debug("refreshBackupCache", zap.String("id", backupInfo.GetId())) + b.backupTasks.Store(backupInfo.GetId(), backupInfo) +} + +type collectionStruct struct { db string collectionName string } @@ -134,12 +139,12 @@ type collection struct { // For backward compatibility: // 1,parse dbCollections first, // 2,if dbCollections not set, use collectionNames -func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupRequest) ([]collection, error) { +func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupRequest) ([]collectionStruct, error) { log.Debug("Request collection names", zap.Strings("request_collection_names", request.GetCollectionNames()), zap.String("request_db_collections", utils.GetCreateDBCollections(request)), zap.Int("length", len(request.GetCollectionNames()))) - var toBackupCollections []collection + var toBackupCollections []collectionStruct dbCollectionsStr := utils.GetCreateDBCollections(request) // first priority: dbCollections @@ -152,23 +157,18 @@ func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupReq } for db, collections := range dbCollections { if len(collections) == 0 { - err := b.getMilvusClient().UsingDatabase(b.ctx, db) - if err != nil { - log.Error("fail to call SDK use database", zap.Error(err)) - return nil, err - } - collections, err := b.getMilvusClient().ListCollections(b.ctx) + collections, err := b.getMilvusClient().ListCollections(b.ctx, db) if err != nil { log.Error("fail in ListCollections", zap.Error(err)) return nil, err } for _, coll := range collections { log.Debug("Add collection to toBackupCollections", zap.String("db", db), zap.String("collection", coll.Name)) - toBackupCollections = append(toBackupCollections, collection{db, coll.Name}) + toBackupCollections = append(toBackupCollections, collectionStruct{db, coll.Name}) } } else { for _, coll := range collections { - toBackupCollections = append(toBackupCollections, collection{db, coll}) + toBackupCollections = append(toBackupCollections, collectionStruct{db, coll}) } } } @@ -183,14 +183,13 @@ func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupReq return nil, err } for _, db := range dbs { - b.getMilvusClient().UsingDatabase(b.ctx, db.Name) - collections, err := b.getMilvusClient().ListCollections(b.ctx) + collections, err := b.getMilvusClient().ListCollections(b.ctx, db.Name) if err != nil { log.Error("fail in ListCollections", zap.Error(err)) return nil, err } for _, coll := range collections { - toBackupCollections = append(toBackupCollections, collection{db.Name, coll.Name}) + toBackupCollections = append(toBackupCollections, collectionStruct{db.Name, coll.Name}) } } log.Debug(fmt.Sprintf("List %v collections", len(toBackupCollections))) @@ -202,9 +201,8 @@ func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupReq dbName = splits[0] collectionName = splits[1] } - b.getMilvusClient().UsingDatabase(b.ctx, dbName) - exist, err := b.getMilvusClient().HasCollection(b.ctx, collectionName) + exist, err := b.getMilvusClient().HasCollection(b.ctx, dbName, collectionName) if err != nil { log.Error("fail in HasCollection", zap.Error(err)) return nil, err @@ -214,13 +212,398 @@ func (b *BackupContext) parseBackupCollections(request *backuppb.CreateBackupReq log.Error(errMsg) return nil, errors.New(errMsg) } - toBackupCollections = append(toBackupCollections, collection{dbName, collectionName}) + toBackupCollections = append(toBackupCollections, collectionStruct{dbName, collectionName}) } } return toBackupCollections, nil } +func (b *BackupContext) backupCollection(ctx context.Context, backupInfo *backuppb.BackupInfo, collection collectionStruct, force bool) error { + log.Info("start backup collection", zap.String("db", collection.db), zap.String("collection", collection.collectionName)) + // list collection result is not complete + completeCollection, err := b.getMilvusClient().DescribeCollection(b.ctx, collection.db, collection.collectionName) + if err != nil { + log.Error("fail in DescribeCollection", zap.Error(err)) + return err + } + fields := make([]*backuppb.FieldSchema, 0) + for _, field := range completeCollection.Schema.Fields { + fields = append(fields, &backuppb.FieldSchema{ + FieldID: field.ID, + Name: field.Name, + IsPrimaryKey: field.PrimaryKey, + Description: field.Description, + AutoID: field.AutoID, + DataType: backuppb.DataType(field.DataType), + TypeParams: utils.MapToKVPair(field.TypeParams), + IndexParams: utils.MapToKVPair(field.IndexParams), + IsDynamic: field.IsDynamic, + IsPartitionKey: field.IsPartitionKey, + }) + } + schema := &backuppb.CollectionSchema{ + Name: completeCollection.Schema.CollectionName, + Description: completeCollection.Schema.Description, + AutoID: completeCollection.Schema.AutoID, + Fields: fields, + EnableDynamicField: completeCollection.Schema.EnableDynamicField, + } + + indexInfos := make([]*backuppb.IndexInfo, 0) + indexDict := make(map[string]*backuppb.IndexInfo, 0) + log.Info("try to get index", + zap.String("collection_name", completeCollection.Name)) + for _, field := range completeCollection.Schema.Fields { + //if field.DataType != entity.FieldTypeBinaryVector && field.DataType != entity.FieldTypeFloatVector { + // continue + //} + fieldIndex, err := b.getMilvusClient().DescribeIndex(b.ctx, collection.db, completeCollection.Name, field.Name) + if err != nil { + if strings.Contains(err.Error(), "index not found") || + strings.HasPrefix(err.Error(), "index doesn't exist") { + // todo + log.Info("field has no index", + zap.String("collection_name", completeCollection.Name), + zap.String("field_name", field.Name)) + continue + } else { + log.Error("fail in DescribeIndex", zap.Error(err)) + return err + } + } + log.Info("field index", + zap.String("collection_name", completeCollection.Name), + zap.String("field_name", field.Name), + zap.Any("index info", fieldIndex)) + for _, index := range fieldIndex { + if _, ok := indexDict[index.Name()]; ok { + continue + } else { + indexInfo := &backuppb.IndexInfo{ + FieldName: index.FieldName(), + IndexName: index.Name(), + IndexType: string(index.IndexType()), + Params: index.Params(), + } + indexInfos = append(indexInfos, indexInfo) + indexDict[index.Name()] = indexInfo + } + } + } + + collectionBackup := &backuppb.CollectionBackupInfo{ + Id: utils.UUID(), + StateCode: backuppb.BackupTaskStateCode_BACKUP_INITIAL, + StartTime: time.Now().Unix(), + CollectionId: completeCollection.ID, + DbName: collection.db, // todo currently db_name is not used in many places + CollectionName: completeCollection.Name, + Schema: schema, + ShardsNum: completeCollection.ShardNum, + ConsistencyLevel: backuppb.ConsistencyLevel(completeCollection.ConsistencyLevel), + HasIndex: len(indexInfos) > 0, + IndexInfos: indexInfos, + } + backupInfo.CollectionBackups = append(backupInfo.CollectionBackups, collectionBackup) + + b.refreshBackupCache(backupInfo) + partitionBackupInfos := make([]*backuppb.PartitionBackupInfo, 0) + partitions, err := b.getMilvusClient().ShowPartitions(b.ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName()) + if err != nil { + log.Error("fail to ShowPartitions", zap.Error(err)) + return err + } + + // use GetLoadingProgress currently, GetLoadState is a new interface @20230104 milvus pr#21515 + collectionLoadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName(), []string{}) + if err != nil { + log.Error("fail to GetLoadingProgress of collection", zap.Error(err)) + return err + } + + var collectionLoadState string + partitionLoadStates := make(map[string]string, 0) + if collectionLoadProgress == 0 { + collectionLoadState = LoadState_NotLoad + for _, partition := range partitions { + partitionLoadStates[partition.Name] = LoadState_NotLoad + } + } else if collectionLoadProgress == 100 { + collectionLoadState = LoadState_Loaded + for _, partition := range partitions { + partitionLoadStates[partition.Name] = LoadState_Loaded + } + } else { + collectionLoadState = LoadState_Loading + for _, partition := range partitions { + loadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName(), []string{partition.Name}) + if err != nil { + log.Error("fail to GetLoadingProgress of partition", zap.Error(err)) + return err + } + if loadProgress == 0 { + partitionLoadStates[partition.Name] = LoadState_NotLoad + } else if loadProgress == 100 { + partitionLoadStates[partition.Name] = LoadState_Loaded + } else { + partitionLoadStates[partition.Name] = LoadState_Loading + } + } + } + + // fill segments + filledSegments := make([]*entity.Segment, 0) + if !force { + // Flush + segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName()) + if err != nil { + return err + } + log.Info("GetPersistentSegmentInfo before flush from milvus", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush))) + newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.getMilvusClient().FlushV2(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName(), false) + if err != nil { + log.Error(fmt.Sprintf("fail to flush the collection: %s", collectionBackup.GetCollectionName())) + return err + } + log.Info("flush segments", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int64s("newSealedSegmentIDs", newSealedSegmentIDs), + zap.Int64s("flushedSegmentIDs", flushedSegmentIDs), + zap.Int64("timeOfSeal", timeOfSeal)) + collectionBackup.BackupTimestamp = utils.ComposeTS(timeOfSeal, 0) + collectionBackup.BackupPhysicalTimestamp = uint64(timeOfSeal) + + flushSegmentIDs := append(newSealedSegmentIDs, flushedSegmentIDs...) + segmentEntitiesAfterFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName()) + if err != nil { + return err + } + log.Info("GetPersistentSegmentInfo after flush from milvus", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)), + zap.Int("segmentNumAfterFlush", len(segmentEntitiesAfterFlush))) + segmentDict := utils.ArrayToMap(flushSegmentIDs) + for _, seg := range segmentEntitiesAfterFlush { + sid := seg.ID + if _, ok := segmentDict[sid]; ok { + delete(segmentDict, sid) + filledSegments = append(filledSegments, seg) + } else { + log.Debug("this may be new segments after flush, skip it", zap.Int64("id", sid)) + } + } + for _, seg := range segmentEntitiesBeforeFlush { + sid := seg.ID + if _, ok := segmentDict[sid]; ok { + delete(segmentDict, sid) + filledSegments = append(filledSegments, seg) + } else { + log.Debug("this may be old segments before flush, skip it", zap.Int64("id", sid)) + } + } + if len(segmentDict) > 0 { + // very rare situation, segments return in flush doesn't exist in either segmentEntitiesBeforeFlush and segmentEntitiesAfterFlush + errorMsg := "Segment return in Flush not exist in GetPersistentSegmentInfo. segment ids: " + fmt.Sprint(utils.MapKeyArray(segmentDict)) + log.Warn(errorMsg) + } + } else { + // Flush + segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName()) + if err != nil { + return err + } + log.Info("GetPersistentSegmentInfo from milvus", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int("segmentNum", len(segmentEntitiesBeforeFlush))) + for _, seg := range segmentEntitiesBeforeFlush { + filledSegments = append(filledSegments, seg) + } + } + + if err != nil { + collectionBackup.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL + collectionBackup.ErrorMessage = err.Error() + return err + } + log.Info("Finished fill segment", + zap.String("collectionName", collectionBackup.GetCollectionName())) + + segmentBackupInfos := make([]*backuppb.SegmentBackupInfo, 0) + partSegInfoMap := make(map[int64][]*backuppb.SegmentBackupInfo) + + segmentLevelBackupInfos := make([]*backuppb.SegmentBackupInfo, 0) + + for _, segment := range filledSegments { + segmentInfo, err := b.readSegmentInfo(ctx, segment.CollectionID, segment.ParititionID, segment.ID, segment.NumRows) + if err != nil { + return err + } + if len(segmentInfo.Binlogs) == 0 { + log.Warn("this segment has no insert binlog", zap.Int64("id", segment.ID)) + } + partSegInfoMap[segment.ParititionID] = append(partSegInfoMap[segment.ParititionID], segmentInfo) + segmentBackupInfos = append(segmentBackupInfos, segmentInfo) + segmentLevelBackupInfos = append(segmentLevelBackupInfos, segmentInfo) + } + log.Info("readSegmentInfo from storage", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int("segmentNum", len(filledSegments))) + + for _, partition := range partitions { + partitionSegments := partSegInfoMap[partition.ID] + var size int64 = 0 + for _, seg := range partitionSegments { + size += seg.GetSize() + } + partitionBackupInfo := &backuppb.PartitionBackupInfo{ + PartitionId: partition.ID, + PartitionName: partition.Name, + CollectionId: collectionBackup.GetCollectionId(), + SegmentBackups: partSegInfoMap[partition.ID], + Size: size, + LoadState: partitionLoadStates[partition.Name], + } + partitionBackupInfos = append(partitionBackupInfos, partitionBackupInfo) + //partitionLevelBackupInfos = append(partitionLevelBackupInfos, partitionBackupInfo) + } + + //leveledBackupInfo.partitionLevel = &backuppb.PartitionLevelBackupInfo{ + // Infos: partitionLevelBackupInfos, + //} + collectionBackup.PartitionBackups = partitionBackupInfos + collectionBackup.LoadState = collectionLoadState + b.refreshBackupCache(backupInfo) + log.Info("finish build partition info", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int("partitionNum", len(partitionBackupInfos))) + + log.Info("Begin copy data", + zap.String("collectionName", collectionBackup.GetCollectionName()), + zap.Int("segmentNum", len(segmentBackupInfos))) + + var collectionBackupSize int64 = 0 + for _, part := range partitionBackupInfos { + collectionBackupSize += part.GetSize() + if part.GetSize() > b.params.BackupCfg.MaxSegmentGroupSize { + log.Info("partition size is larger than MaxSegmentGroupSize, will separate segments into groups in backup files", + zap.Int64("collectionId", part.GetCollectionId()), + zap.Int64("partitionId", part.GetPartitionId()), + zap.Int64("partitionSize", part.GetSize()), + zap.Int64("MaxSegmentGroupSize", b.params.BackupCfg.MaxSegmentGroupSize)) + segments := partSegInfoMap[part.GetPartitionId()] + var bufferSize int64 = 0 + // 0 is illegal value, start from 1 + var segGroupID int64 = 1 + for _, seg := range segments { + if seg.Size > b.params.BackupCfg.MaxSegmentGroupSize && bufferSize == 0 { + seg.GroupId = segGroupID + segGroupID = segGroupID + 1 + } else if bufferSize+seg.Size > b.params.BackupCfg.MaxSegmentGroupSize { + segGroupID = segGroupID + 1 + seg.GroupId = segGroupID + bufferSize = 0 + bufferSize = bufferSize + seg.Size + } else { + seg.GroupId = segGroupID + bufferSize = bufferSize + seg.Size + } + } + } else { + log.Info("partition size is smaller than MaxSegmentGroupSize, won't separate segments into groups in backup files", + zap.Int64("collectionId", part.GetCollectionId()), + zap.Int64("partitionId", part.GetPartitionId()), + zap.Int64("partitionSize", part.GetSize()), + zap.Int64("MaxSegmentGroupSize", b.params.BackupCfg.MaxSegmentGroupSize)) + } + } + + err = b.copySegments(ctx, segmentBackupInfos, BackupBinlogDirPath(b.backupRootPath, backupInfo.GetName())) + b.refreshBackupCache(backupInfo) + + collectionBackup.Size = collectionBackupSize + collectionBackup.EndTime = time.Now().Unix() + return nil +} + +func (b *BackupContext) executeCreateBackupV2(ctx context.Context, request *backuppb.CreateBackupRequest, backupInfo *backuppb.BackupInfo) (*backuppb.BackupInfo, error) { + b.mu.Lock() + defer b.mu.Unlock() + + wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.BackupParallelism, RPS) + if err != nil { + return backupInfo, err + } + wp.Start() + log.Info("Start collection level backup pool", zap.Int("parallelism", b.params.BackupCfg.BackupParallelism)) + + backupInfo.BackupTimestamp = uint64(time.Now().UnixNano() / int64(time.Millisecond)) + backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_EXECUTING + + defer b.refreshBackupCache(backupInfo) + + // 1, get collection level meta + toBackupCollections, err := b.parseBackupCollections(request) + if err != nil { + log.Error("parse backup collections from request failed", zap.Error(err)) + return backupInfo, err + } + collectionNames := make([]string, len(toBackupCollections)) + for i, coll := range toBackupCollections { + collectionNames[i] = coll.collectionName + } + log.Info("collections to backup", zap.Strings("collections", collectionNames)) + + for _, collection := range toBackupCollections { + collectionClone := collection + job := func(ctx context.Context) error { + err := b.backupCollection(ctx, backupInfo, collectionClone, request.GetForce()) + return err + } + wp.Submit(job) + } + wp.Done() + if err := wp.Wait(); err != nil { + return backupInfo, err + } + + var backupSize int64 = 0 + leveledBackupInfo, err := treeToLevel(backupInfo) + if err != nil { + return backupInfo, err + } + for _, coll := range leveledBackupInfo.collectionLevel.GetInfos() { + backupSize += coll.GetSize() + } + backupInfo.Size = backupSize + backupInfo.EndTime = time.Now().UnixNano() / int64(time.Millisecond) + backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_SUCCESS + b.refreshBackupCache(backupInfo) + + // 7, write meta data + output, _ := serialize(backupInfo) + log.Debug("backup meta", zap.String("value", string(output.BackupMetaBytes))) + log.Debug("collection meta", zap.String("value", string(output.CollectionMetaBytes))) + log.Debug("partition meta", zap.String("value", string(output.PartitionMetaBytes))) + log.Debug("segment meta", zap.String("value", string(output.SegmentMetaBytes))) + + b.getStorageClient().Write(ctx, b.backupBucketName, BackupMetaPath(b.backupRootPath, backupInfo.GetName()), output.BackupMetaBytes) + b.getStorageClient().Write(ctx, b.backupBucketName, CollectionMetaPath(b.backupRootPath, backupInfo.GetName()), output.CollectionMetaBytes) + b.getStorageClient().Write(ctx, b.backupBucketName, PartitionMetaPath(b.backupRootPath, backupInfo.GetName()), output.PartitionMetaBytes) + b.getStorageClient().Write(ctx, b.backupBucketName, SegmentMetaPath(b.backupRootPath, backupInfo.GetName()), output.SegmentMetaBytes) + b.getStorageClient().Write(ctx, b.backupBucketName, FullMetaPath(b.backupRootPath, backupInfo.GetName()), output.FullMetaBytes) + + log.Info("finish executeCreateBackup", + zap.String("requestId", request.GetRequestId()), + zap.String("backupName", request.GetBackupName()), + zap.Strings("collections", request.GetCollectionNames()), + zap.Bool("async", request.GetAsync()), + zap.String("backup meta", string(output.BackupMetaBytes))) + return backupInfo, nil +} + func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backuppb.CreateBackupRequest, backupInfo *backuppb.BackupInfo) (*backuppb.BackupInfo, error) { b.mu.Lock() defer b.mu.Unlock() @@ -249,8 +632,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup partitionLevelBackupInfos := make([]*backuppb.PartitionBackupInfo, 0) for _, collection := range toBackupCollections { // list collection result is not complete - b.getMilvusClient().UsingDatabase(b.ctx, collection.db) - completeCollection, err := b.getMilvusClient().DescribeCollection(b.ctx, collection.collectionName) + completeCollection, err := b.getMilvusClient().DescribeCollection(b.ctx, collection.db, collection.collectionName) if err != nil { log.Error("fail in DescribeCollection", zap.Error(err)) return backupInfo, err @@ -286,7 +668,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup //if field.DataType != entity.FieldTypeBinaryVector && field.DataType != entity.FieldTypeFloatVector { // continue //} - fieldIndex, err := b.getMilvusClient().DescribeIndex(b.ctx, completeCollection.Name, field.Name) + fieldIndex, err := b.getMilvusClient().DescribeIndex(b.ctx, collection.db, completeCollection.Name, field.Name) if err != nil { if strings.Contains(err.Error(), "index not found") || strings.HasPrefix(err.Error(), "index doesn't exist") { @@ -343,16 +725,15 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup segmentLevelBackupInfos := make([]*backuppb.SegmentBackupInfo, 0) // backup collection for _, collection := range collectionBackupInfos { - b.getMilvusClient().UsingDatabase(b.ctx, collection.GetDbName()) partitionBackupInfos := make([]*backuppb.PartitionBackupInfo, 0) - partitions, err := b.getMilvusClient().ShowPartitions(b.ctx, collection.GetCollectionName()) + partitions, err := b.getMilvusClient().ShowPartitions(b.ctx, collection.GetDbName(), collection.GetCollectionName()) if err != nil { log.Error("fail to ShowPartitions", zap.Error(err)) return backupInfo, err } // use GetLoadingProgress currently, GetLoadState is a new interface @20230104 milvus pr#21515 - collectionLoadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collection.GetCollectionName(), []string{}) + collectionLoadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collection.GetDbName(), collection.GetCollectionName(), []string{}) if err != nil { log.Error("fail to GetLoadingProgress of collection", zap.Error(err)) return backupInfo, err @@ -373,7 +754,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup } else { collectionLoadState = LoadState_Loading for _, partition := range partitions { - loadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collection.GetCollectionName(), []string{partition.Name}) + loadProgress, err := b.getMilvusClient().GetLoadingProgress(ctx, collection.GetDbName(), collection.GetCollectionName(), []string{partition.Name}) if err != nil { log.Error("fail to GetLoadingProgress of partition", zap.Error(err)) return backupInfo, err @@ -392,16 +773,16 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup filledSegments := make([]*entity.Segment, 0) if !request.GetForce() { // Flush - segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetCollectionName()) + segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetDbName(), collection.GetCollectionName()) if err != nil { return backupInfo, err } log.Info("GetPersistentSegmentInfo before flush from milvus", zap.String("collectionName", collection.GetCollectionName()), zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush))) - newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.getMilvusClient().FlushV2(ctx, collection.GetCollectionName(), false) + newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.getMilvusClient().FlushV2(ctx, collection.GetDbName(), collection.GetCollectionName(), false) if err != nil { - log.Error(fmt.Sprintf("fail to flush the collection: %s", collection.GetCollectionName())) + log.Error(fmt.Sprintf("fail to flush the collection: %s.%s", collection.GetDbName(), collection.GetCollectionName())) return backupInfo, err } log.Info("flush segments", @@ -413,7 +794,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup collection.BackupPhysicalTimestamp = uint64(timeOfSeal) flushSegmentIDs := append(newSealedSegmentIDs, flushedSegmentIDs...) - segmentEntitiesAfterFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetCollectionName()) + segmentEntitiesAfterFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetDbName(), collection.GetCollectionName()) if err != nil { return backupInfo, err } @@ -447,7 +828,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup } } else { // Flush - segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetCollectionName()) + segmentEntitiesBeforeFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collection.GetDbName(), collection.GetCollectionName()) if err != nil { return backupInfo, err } @@ -596,7 +977,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup } func (b *BackupContext) copySegments(ctx context.Context, segments []*backuppb.SegmentBackupInfo, dstPath string) error { - wp, err := common.NewWorkerPool(ctx, WORKER_NUM, RPS) + wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.BackupCopyDataParallelism, RPS) if err != nil { return err } diff --git a/core/backup_impl_restore_backup.go b/core/backup_impl_restore_backup.go index 4dbfdbff..ca461c49 100644 --- a/core/backup_impl_restore_backup.go +++ b/core/backup_impl_restore_backup.go @@ -14,6 +14,7 @@ import ( "github.com/zilliztech/milvus-backup/core/proto/backuppb" "github.com/zilliztech/milvus-backup/core/utils" + "github.com/zilliztech/milvus-backup/internal/common" "github.com/zilliztech/milvus-backup/internal/log" "github.com/zilliztech/milvus-backup/internal/util/retry" ) @@ -236,17 +237,9 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res } log.Info("create database", zap.String("database", targetDBName)) } - err = b.getMilvusClient().UsingDatabase(ctx, targetDBName) - if err != nil { - errorMsg := fmt.Sprintf("fail to switch database %s, err: %s", targetDBName, err) - log.Error(errorMsg) - resp.Code = backuppb.ResponseCode_Fail - resp.Msg = errorMsg - return resp - } // check if the collection exist, if exist, will not restore - exist, err := b.getMilvusClient().HasCollection(ctx, targetCollectionName) + exist, err := b.getMilvusClient().HasCollection(ctx, targetDBName, targetCollectionName) if err != nil { errorMsg := fmt.Sprintf("fail to check whether the collection is exist, collection_name: %s, err: %s", targetDBCollectionName, err) log.Error(errorMsg) @@ -315,6 +308,13 @@ func (b *BackupContext) executeRestoreBackupTask(ctx context.Context, backupBuck b.mu.Lock() defer b.mu.Unlock() + wp, err := common.NewWorkerPool(ctx, b.params.BackupCfg.RestoreParallelism, RPS) + if err != nil { + return task, err + } + wp.Start() + log.Info("Start collection level restore pool", zap.Int("parallelism", b.params.BackupCfg.RestoreParallelism)) + id := task.GetId() b.restoreTasks[id] = task task.StateCode = backuppb.RestoreTaskStateCode_EXECUTING @@ -332,25 +332,34 @@ func (b *BackupContext) executeRestoreBackupTask(ctx context.Context, backupBuck // 3, execute restoreCollectionTasks for _, restoreCollectionTask := range restoreCollectionTasks { - endTask, err := b.executeRestoreCollectionTask(ctx, backupBucketName, backupPath, restoreCollectionTask, id) - if err != nil { - log.Error("executeRestoreCollectionTask failed", - zap.String("TargetDBName", restoreCollectionTask.GetTargetDbName()), - zap.String("TargetCollectionName", restoreCollectionTask.GetTargetCollectionName()), - zap.Error(err)) - return task, err - } - log.Info("finish restore collection", - zap.String("db_name", restoreCollectionTask.GetTargetDbName()), - zap.String("collection_name", restoreCollectionTask.GetTargetCollectionName())) - restoreCollectionTask.StateCode = backuppb.RestoreTaskStateCode_SUCCESS - task.RestoredSize += endTask.RestoredSize - if task.GetToRestoreSize() == 0 { - task.Progress = 100 - } else { - task.Progress = int32(100 * task.GetRestoredSize() / task.GetToRestoreSize()) + restoreCollectionTaskClone := restoreCollectionTask + job := func(ctx context.Context) error { + endTask, err := b.executeRestoreCollectionTask(ctx, backupBucketName, backupPath, restoreCollectionTaskClone, id) + if err != nil { + log.Error("executeRestoreCollectionTask failed", + zap.String("TargetDBName", restoreCollectionTaskClone.GetTargetDbName()), + zap.String("TargetCollectionName", restoreCollectionTaskClone.GetTargetCollectionName()), + zap.Error(err)) + return err + } + log.Info("finish restore collection", + zap.String("db_name", restoreCollectionTaskClone.GetTargetDbName()), + zap.String("collection_name", restoreCollectionTaskClone.GetTargetCollectionName())) + restoreCollectionTaskClone.StateCode = backuppb.RestoreTaskStateCode_SUCCESS + task.RestoredSize += endTask.RestoredSize + if task.GetToRestoreSize() == 0 { + task.Progress = 100 + } else { + task.Progress = int32(100 * task.GetRestoredSize() / task.GetToRestoreSize()) + } + updateRestoreTaskFunc(id, task) + return nil } - updateRestoreTaskFunc(id, task) + wp.Submit(job) + } + wp.Done() + if err := wp.Wait(); err != nil { + return task, err } task.StateCode = backuppb.RestoreTaskStateCode_SUCCESS @@ -398,13 +407,12 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup EnableDynamicField: task.GetCollBackup().GetSchema().GetEnableDynamicField(), } - b.getMilvusClient().UsingDatabase(ctx, targetDBName) - err := retry.Do(ctx, func() error { if hasPartitionKey { partitionNum := len(task.GetCollBackup().GetPartitionBackups()) return b.getMilvusClient().CreateCollection( ctx, + targetDBName, collectionSchema, task.GetCollBackup().GetShardsNum(), gomilvus.WithConsistencyLevel(entity.ConsistencyLevel(task.GetCollBackup().GetConsistencyLevel())), @@ -412,6 +420,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup } return b.getMilvusClient().CreateCollection( ctx, + targetDBName, collectionSchema, task.GetCollBackup().GetShardsNum(), gomilvus.WithConsistencyLevel(entity.ConsistencyLevel(task.GetCollBackup().GetConsistencyLevel()))) @@ -432,7 +441,7 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup indexes := task.GetCollBackup().GetIndexInfos() for _, index := range indexes { idx := entity.NewGenericIndex(index.GetIndexName(), entity.IndexType(index.GetIndexType()), index.GetFieldName(), index.GetParams()) - err := b.getMilvusClient().CreateIndex(ctx, targetCollectionName, index.GetFieldName(), idx, true) + err := b.getMilvusClient().CreateIndex(ctx, targetDBName, targetCollectionName, index.GetFieldName(), idx, true) if err != nil { log.Warn("Fail to restore index", zap.Error(err)) return task, err @@ -454,14 +463,14 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup }() for _, partitionBackup := range task.GetCollBackup().GetPartitionBackups() { - exist, err := b.getMilvusClient().HasPartition(ctx, targetCollectionName, partitionBackup.GetPartitionName()) + exist, err := b.getMilvusClient().HasPartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName()) if err != nil { log.Error("fail to check has partition", zap.Error(err)) return task, err } if !exist { err = retry.Do(ctx, func() error { - return b.getMilvusClient().CreatePartition(ctx, targetCollectionName, partitionBackup.GetPartitionName()) + return b.getMilvusClient().CreatePartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName()) }, retry.Attempts(10), retry.Sleep(1*time.Second)) if err != nil { log.Error("fail to create partition", zap.Error(err)) @@ -495,10 +504,11 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup realFiles = files } - err = b.executeBulkInsert(ctx, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp)) + err = b.executeBulkInsert(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp)) if err != nil { log.Error("fail to bulk insert to partition", zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), + zap.String("targetDBName", targetDBName), zap.String("targetCollectionName", targetCollectionName), zap.String("partition", partitionBackup.GetPartitionName()), zap.Error(err)) @@ -578,8 +588,9 @@ func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64 return res } -func (b *BackupContext) executeBulkInsert(ctx context.Context, coll string, partition string, files []string, endTime int64) error { +func (b *BackupContext) executeBulkInsert(ctx context.Context, db, coll string, partition string, files []string, endTime int64) error { log.Debug("execute bulk insert", + zap.String("db", db), zap.String("collection", coll), zap.String("partition", partition), zap.Strings("files", files), @@ -587,16 +598,17 @@ func (b *BackupContext) executeBulkInsert(ctx context.Context, coll string, part var taskId int64 var err error if endTime == 0 { - taskId, err = b.getMilvusClient().BulkInsert(ctx, coll, partition, files, gomilvus.IsBackup()) + taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsBackup()) } else { - taskId, err = b.getMilvusClient().BulkInsert(ctx, coll, partition, files, gomilvus.IsBackup(), gomilvus.WithEndTs(endTime)) + taskId, err = b.getMilvusClient().BulkInsert(ctx, db, coll, partition, files, gomilvus.IsBackup(), gomilvus.WithEndTs(endTime)) } if err != nil { log.Error("fail to bulk insert", - zap.Error(err), + zap.String("db", db), zap.String("collectionName", coll), zap.String("partitionName", partition), - zap.Strings("files", files)) + zap.Strings("files", files), + zap.Error(err)) return err } err = b.watchBulkInsertState(ctx, taskId, BULKINSERT_TIMEOUT, BULKINSERT_SLEEP_INTERVAL) diff --git a/core/milvus_sdk_wrapper.go b/core/milvus_sdk_wrapper.go new file mode 100644 index 00000000..ec8cb743 --- /dev/null +++ b/core/milvus_sdk_wrapper.go @@ -0,0 +1,164 @@ +package core + +import ( + "context" + gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client" + "github.com/milvus-io/milvus-sdk-go/v2/entity" + "sync" +) + +// MilvusClient wrap db into milvus API to make it thread safe +type MilvusClient struct { + mu sync.Mutex + client gomilvus.Client +} + +func (m *MilvusClient) Close() error { + return m.client.Close() +} + +func (m *MilvusClient) GetVersion(ctx context.Context) (string, error) { + return m.client.GetVersion(ctx) +} + +func (m *MilvusClient) CreateDatabase(ctx context.Context, dbName string) error { + return m.client.CreateDatabase(ctx, dbName) +} + +func (m *MilvusClient) ListDatabases(ctx context.Context) ([]entity.Database, error) { + return m.client.ListDatabases(ctx) +} + +func (m *MilvusClient) DescribeCollection(ctx context.Context, db, collName string) (*entity.Collection, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return nil, err + } + return m.client.DescribeCollection(ctx, collName) +} + +func (m *MilvusClient) DescribeIndex(ctx context.Context, db, collName, fieldName string) ([]entity.Index, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return nil, err + } + return m.client.DescribeIndex(ctx, collName, fieldName) +} + +func (m *MilvusClient) ShowPartitions(ctx context.Context, db, collName string) ([]*entity.Partition, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return nil, err + } + return m.client.ShowPartitions(ctx, collName) +} + +func (m *MilvusClient) GetLoadingProgress(ctx context.Context, db, collName string, partitionNames []string) (int64, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return 0, err + } + return m.client.GetLoadingProgress(ctx, collName, partitionNames) +} + +func (m *MilvusClient) GetPersistentSegmentInfo(ctx context.Context, db, collName string) ([]*entity.Segment, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return nil, err + } + return m.client.GetPersistentSegmentInfo(ctx, collName) +} + +func (m *MilvusClient) FlushV2(ctx context.Context, db, collName string, async bool) ([]int64, []int64, int64, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return nil, nil, 0, err + } + return m.client.FlushV2(ctx, collName, async) +} + +func (m *MilvusClient) ListCollections(ctx context.Context, db string) ([]*entity.Collection, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return nil, err + } + return m.client.ListCollections(ctx) +} + +func (m *MilvusClient) HasCollection(ctx context.Context, db, collName string) (bool, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return false, err + } + return m.client.HasCollection(ctx, collName) +} + +func (m *MilvusClient) BulkInsert(ctx context.Context, db, collName string, partitionName string, files []string, opts ...gomilvus.BulkInsertOption) (int64, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return 0, err + } + return m.client.BulkInsert(ctx, collName, partitionName, files, opts...) +} + +func (m *MilvusClient) GetBulkInsertState(ctx context.Context, taskID int64) (*entity.BulkInsertTaskState, error) { + return m.client.GetBulkInsertState(ctx, taskID) +} + +func (m *MilvusClient) CreateCollection(ctx context.Context, db string, schema *entity.Schema, shardsNum int32, opts ...gomilvus.CreateCollectionOption) error { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return err + } + return m.client.CreateCollection(ctx, schema, shardsNum, opts...) +} + +func (m *MilvusClient) CreatePartition(ctx context.Context, db, collName string, partitionName string) error { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return err + } + return m.client.CreatePartition(ctx, collName, partitionName) +} + +func (m *MilvusClient) HasPartition(ctx context.Context, db, collName string, partitionName string) (bool, error) { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return false, err + } + return m.client.HasPartition(ctx, collName, partitionName) +} + +func (m *MilvusClient) CreateIndex(ctx context.Context, db, collName string, fieldName string, idx entity.Index, async bool, opts ...gomilvus.IndexOption) error { + m.mu.Lock() + defer m.mu.Unlock() + err := m.client.UsingDatabase(ctx, db) + if err != nil { + return err + } + return m.client.CreateIndex(ctx, collName, fieldName, idx, async, opts...) +} diff --git a/core/paramtable/params.go b/core/paramtable/params.go index 65865c8f..4befad13 100644 --- a/core/paramtable/params.go +++ b/core/paramtable/params.go @@ -33,12 +33,19 @@ type BackupConfig struct { Base *BaseTable MaxSegmentGroupSize int64 + + BackupParallelism int + RestoreParallelism int + BackupCopyDataParallelism int } func (p *BackupConfig) init(base *BaseTable) { p.Base = base p.initMaxSegmentGroupSize() + p.initBackupParallelism() + p.initRestoreParallelism() + p.initBackupCopyDataParallelism() } func (p *BackupConfig) initMaxSegmentGroupSize() { @@ -49,6 +56,21 @@ func (p *BackupConfig) initMaxSegmentGroupSize() { p.MaxSegmentGroupSize = size } +func (p *BackupConfig) initBackupParallelism() { + size := p.Base.ParseIntWithDefault("backup.parallelism", 1) + p.BackupParallelism = size +} + +func (p *BackupConfig) initRestoreParallelism() { + size := p.Base.ParseIntWithDefault("restore.parallelism", 1) + p.RestoreParallelism = size +} + +func (p *BackupConfig) initBackupCopyDataParallelism() { + size := p.Base.ParseIntWithDefault("backup.copydata.parallelism", 10) + p.BackupCopyDataParallelism = size +} + type MilvusConfig struct { Base *BaseTable diff --git a/ut_test.go b/ut_test.go deleted file mode 100644 index 06ab7d0f..00000000 --- a/ut_test.go +++ /dev/null @@ -1 +0,0 @@ -package main