Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example for partition data #210

Merged
merged 2 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (
renameCollectionNames string
restoreDatabases string
restoreDatabaseCollections string
restoreMetaOnly bool
restoreIndex bool
)

var restoreBackupCmd = &cobra.Command{
Expand Down Expand Up @@ -78,6 +80,8 @@ var restoreBackupCmd = &cobra.Command{
CollectionSuffix: renameSuffix,
CollectionRenames: renameMap,
DbCollections: utils.WrapDBCollections(restoreDatabaseCollections),
MetaOnly: restoreMetaOnly,
RestoreIndex: restoreIndex,
})

fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
Expand All @@ -92,5 +96,8 @@ func init() {
restoreBackupCmd.Flags().StringVarP(&restoreDatabases, "databases", "d", "", "databases to restore, if not set, restore all databases")
restoreBackupCmd.Flags().StringVarP(&restoreDatabaseCollections, "database_collections", "a", "", "databases and collections to restore, json format: {\"db1\":[\"c1\", \"c2\"],\"db2\":[]}")

restoreBackupCmd.Flags().BoolVarP(&restoreMetaOnly, "meta_only", "", false, "if set true, will restore meta only")
restoreBackupCmd.Flags().BoolVarP(&restoreIndex, "restore_index", "", false, "if set true, will restore index")

rootCmd.AddCommand(restoreBackupCmd)
}
80 changes: 50 additions & 30 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
log.Info("receive RestoreBackupRequest",
zap.String("requestId", request.GetRequestId()),
zap.String("backupName", request.GetBackupName()),
zap.Bool("onlyMeta", request.GetMetaOnly()),
zap.Bool("restoreIndex", request.GetRestoreIndex()),
zap.Strings("collections", request.GetCollectionNames()),
zap.String("CollectionSuffix", request.GetCollectionSuffix()),
zap.Any("CollectionRenames", request.GetCollectionRenames()),
Expand Down Expand Up @@ -277,6 +279,8 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
ToRestoreSize: toRestoreSize,
RestoredSize: 0,
Progress: 0,
MetaOnly: request.GetMetaOnly(),
RestoreIndex: request.GetRestoreIndex(),
}
restoreCollectionTasks = append(restoreCollectionTasks, restoreCollectionTask)
task.CollectionRestoreTasks = restoreCollectionTasks
Expand Down Expand Up @@ -424,6 +428,18 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
zap.String("collectionName", targetCollectionName),
zap.Bool("hasPartitionKey", hasPartitionKey))

if task.GetRestoreIndex() {
indexes := task.GetCollBackup().GetIndexInfos()
for _, index := range indexes {
idx := entity.NewGenericIndex(index.GetIndexName(), entity.IndexType(index.GetIndexType()), index.GetFieldName(), index.GetParams())
err := b.getMilvusClient().CreateIndex(ctx, targetCollectionName, index.GetFieldName(), idx, true)
if err != nil {
log.Warn("Fail to restore index", zap.Error(err))
return task, err
}
}
}

tempDir := "restore-temp-" + parentTaskID + SEPERATOR
isSameBucket := b.milvusBucketName == backupBucketName
// clean the temporary file
Expand Down Expand Up @@ -491,31 +507,13 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
return nil
}

groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
if task.GetMetaOnly() {
task.Progress = 100
} else {
// bulk insert by segment groups
for _, groupId := range groupIds {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
Expand All @@ -533,13 +531,35 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
} else {
// bulk insert by segment groups
for _, groupId := range groupIds {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
}
}
task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
if task.ToRestoreSize == 0 {
task.Progress = 100
} else {
task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
}
}
task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
if task.ToRestoreSize == 0 {
task.Progress = 100
} else {
task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
}
}

Expand Down
8 changes: 8 additions & 0 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ message RestoreBackupRequest {
string path = 8;
// database and collections to restore. A json string. To support database. 2023.7.7
google.protobuf.Value db_collections = 9;
// if true only restore meta
bool metaOnly = 10;
// if true restore index info
bool restoreIndex = 11;
}

message RestorePartitionTask {
Expand All @@ -276,6 +280,10 @@ message RestoreCollectionTask {
int64 to_restore_size = 10;
int32 progress = 11;
string target_db_name = 12;
// if true only restore meta
bool metaOnly = 13;
// if true restore index info
bool restoreIndex = 14;
}

message RestoreBackupTask {
Expand Down
Loading