Skip to content

Commit

Permalink
Add example for partition data (zilliztech#210)
Browse files Browse the repository at this point in the history
* Add example for partition data

Signed-off-by: wayblink <[email protected]>

* Support restore meta only and restore index

Signed-off-by: wayblink <[email protected]>

---------

Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored and zhuwenxing committed Feb 21, 2024
1 parent c2aa1af commit 1249083
Show file tree
Hide file tree
Showing 12 changed files with 572 additions and 208 deletions.
7 changes: 7 additions & 0 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (
renameCollectionNames string
restoreDatabases string
restoreDatabaseCollections string
restoreMetaOnly bool
restoreIndex bool
)

var restoreBackupCmd = &cobra.Command{
Expand Down Expand Up @@ -78,6 +80,8 @@ var restoreBackupCmd = &cobra.Command{
CollectionSuffix: renameSuffix,
CollectionRenames: renameMap,
DbCollections: utils.WrapDBCollections(restoreDatabaseCollections),
MetaOnly: restoreMetaOnly,
RestoreIndex: restoreIndex,
})

fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
Expand All @@ -92,5 +96,8 @@ func init() {
restoreBackupCmd.Flags().StringVarP(&restoreDatabases, "databases", "d", "", "databases to restore, if not set, restore all databases")
restoreBackupCmd.Flags().StringVarP(&restoreDatabaseCollections, "database_collections", "a", "", "databases and collections to restore, json format: {\"db1\":[\"c1\", \"c2\"],\"db2\":[]}")

restoreBackupCmd.Flags().BoolVarP(&restoreMetaOnly, "meta_only", "", false, "if set true, will restore meta only")
restoreBackupCmd.Flags().BoolVarP(&restoreIndex, "restore_index", "", false, "if set true, will restore index")

rootCmd.AddCommand(restoreBackupCmd)
}
80 changes: 50 additions & 30 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
log.Info("receive RestoreBackupRequest",
zap.String("requestId", request.GetRequestId()),
zap.String("backupName", request.GetBackupName()),
zap.Bool("onlyMeta", request.GetMetaOnly()),
zap.Bool("restoreIndex", request.GetRestoreIndex()),
zap.Strings("collections", request.GetCollectionNames()),
zap.String("CollectionSuffix", request.GetCollectionSuffix()),
zap.Any("CollectionRenames", request.GetCollectionRenames()),
Expand Down Expand Up @@ -277,6 +279,8 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
ToRestoreSize: toRestoreSize,
RestoredSize: 0,
Progress: 0,
MetaOnly: request.GetMetaOnly(),
RestoreIndex: request.GetRestoreIndex(),
}
restoreCollectionTasks = append(restoreCollectionTasks, restoreCollectionTask)
task.CollectionRestoreTasks = restoreCollectionTasks
Expand Down Expand Up @@ -424,6 +428,18 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
zap.String("collectionName", targetCollectionName),
zap.Bool("hasPartitionKey", hasPartitionKey))

if task.GetRestoreIndex() {
indexes := task.GetCollBackup().GetIndexInfos()
for _, index := range indexes {
idx := entity.NewGenericIndex(index.GetIndexName(), entity.IndexType(index.GetIndexType()), index.GetFieldName(), index.GetParams())
err := b.getMilvusClient().CreateIndex(ctx, targetCollectionName, index.GetFieldName(), idx, true)
if err != nil {
log.Warn("Fail to restore index", zap.Error(err))
return task, err
}
}
}

tempDir := "restore-temp-" + parentTaskID + SEPERATOR
isSameBucket := b.milvusBucketName == backupBucketName
// clean the temporary file
Expand Down Expand Up @@ -491,31 +507,13 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
return nil
}

groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
if task.GetMetaOnly() {
task.Progress = 100
} else {
// bulk insert by segment groups
for _, groupId := range groupIds {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
Expand All @@ -533,13 +531,35 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
} else {
// bulk insert by segment groups
for _, groupId := range groupIds {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
}
}
task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
if task.ToRestoreSize == 0 {
task.Progress = 100
} else {
task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
}
}
task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
if task.ToRestoreSize == 0 {
task.Progress = 100
} else {
task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
}
}

Expand Down
8 changes: 8 additions & 0 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ message RestoreBackupRequest {
string path = 8;
// database and collections to restore. A json string. To support database. 2023.7.7
google.protobuf.Value db_collections = 9;
// if true only restore meta
bool metaOnly = 10;
// if true restore index info
bool restoreIndex = 11;
}

message RestorePartitionTask {
Expand All @@ -276,6 +280,10 @@ message RestoreCollectionTask {
int64 to_restore_size = 10;
int32 progress = 11;
string target_db_name = 12;
// if true only restore meta
bool metaOnly = 13;
// if true restore index info
bool restoreIndex = 14;
}

message RestoreBackupTask {
Expand Down
Loading

0 comments on commit 1249083

Please sign in to comment.