Skip to content

Commit

Permalink
Support backup progress (#370)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Jun 27, 2024
1 parent 6a59295 commit 0e539a0
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 20 deletions.
3 changes: 1 addition & 2 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,7 @@ func (b *BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBack
zap.String("backupName", request.GetBackupName()),
zap.String("backupId", request.GetBackupId()),
zap.String("bucketName", request.GetBucketName()),
zap.String("path", request.GetPath()),
zap.Any("resp", resp))
zap.String("path", request.GetPath()))
}

return resp
Expand Down
2 changes: 1 addition & 1 deletion core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ func (b *BackupContext) copySegment(ctx context.Context, backupBinlogPath string
}
}
}
segment.Backuped = true
b.meta.UpdateSegment(segment.GetPartitionId(), segment.GetSegmentId(), setSegmentBackuped(true))
return nil
}

Expand Down
17 changes: 3 additions & 14 deletions core/backup_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"

"github.com/golang/protobuf/proto"

"github.com/zilliztech/milvus-backup/core/proto/backuppb"
)

Expand Down Expand Up @@ -287,21 +286,11 @@ func SimpleBackupResponse(input *backuppb.BackupInfoResponse) *backuppb.BackupIn
// clone and remove PartitionBackups, avoid updating here every time we add a field in CollectionBackupInfo
clonedCollectionBackup := proto.Clone(coll).(*backuppb.CollectionBackupInfo)
clonedCollectionBackup.PartitionBackups = nil
clonedCollectionBackup.Schema = nil
collections = append(collections, clonedCollectionBackup)
}
simpleBackupInfo := &backuppb.BackupInfo{
Id: backup.GetId(),
Name: backup.GetName(),
StateCode: backup.GetStateCode(),
ErrorMessage: backup.GetErrorMessage(),
BackupTimestamp: backup.GetBackupTimestamp(),
CollectionBackups: collections,
MilvusVersion: backup.GetMilvusVersion(),
StartTime: backup.GetStartTime(),
EndTime: backup.GetEndTime(),
Progress: backup.GetProgress(),
Size: backup.GetSize(),
}
simpleBackupInfo := proto.Clone(backup).(*backuppb.BackupInfo)
simpleBackupInfo.CollectionBackups = collections
return &backuppb.BackupInfoResponse{
RequestId: input.GetRequestId(),
Code: input.GetCode(),
Expand Down
27 changes: 24 additions & 3 deletions core/backup_meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"sync"

"github.com/golang/protobuf/proto"
"go.uber.org/zap"

"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/internal/log"
)

type MetaManager struct {
Expand Down Expand Up @@ -64,6 +66,7 @@ func (meta *MetaManager) AddBackup(backup *backuppb.BackupInfo) {
meta.mu.Lock()
defer meta.mu.Unlock()
meta.backups[backup.Id] = backup
meta.backupNameToIdDict[backup.Name] = backup.Id
}

func (meta *MetaManager) AddCollection(collection *backuppb.CollectionBackupInfo) {
Expand Down Expand Up @@ -456,24 +459,42 @@ func (meta *MetaManager) GetFullMeta(id string) *backuppb.BackupInfo {
return nil
}
collections := meta.collections[id]
var backupedSize int64 = 0
var totalSize int64 = 0
cloneBackup := proto.Clone(backup).(*backuppb.BackupInfo)

collectionBackups := make([]*backuppb.CollectionBackupInfo, 0)
for collectionID, collection := range collections {
collectionBackup := proto.Clone(collection).(*backuppb.CollectionBackupInfo)
partitionBackups := make([]*backuppb.PartitionBackupInfo, 0)
for partitionID, partition := range meta.partitions[collectionID] {
segmentBackups := make([]*backuppb.SegmentBackupInfo, 0)
partitionBackup := proto.Clone(partition).(*backuppb.PartitionBackupInfo)
for _, segment := range meta.segments[partitionID] {
segmentBackups = append(segmentBackups, proto.Clone(segment).(*backuppb.SegmentBackupInfo))
if segment.Backuped {
backupedSize += segment.GetSize()
}
totalSize += segment.GetSize()
partitionBackup.Size = partitionBackup.Size + segment.GetSize()
}
partitionBackup := proto.Clone(partition).(*backuppb.PartitionBackupInfo)
partitionBackup.SegmentBackups = segmentBackups
partitionBackups = append(partitionBackups, partitionBackup)
collectionBackup.Size = collectionBackup.Size + partitionBackup.Size
}
collectionBackup := proto.Clone(collection).(*backuppb.CollectionBackupInfo)
collectionBackup.PartitionBackups = partitionBackups
collectionBackups = append(collectionBackups, collectionBackup)
cloneBackup.Size = cloneBackup.Size + collectionBackup.Size
}
cloneBackup := proto.Clone(backup).(*backuppb.BackupInfo)
cloneBackup.CollectionBackups = collectionBackups
cloneBackup.Progress = 1
if totalSize != 0 {
cloneBackup.Progress = int32(backupedSize * 100 / (totalSize))
} else {
cloneBackup.Progress = 100
}
log.Info("backup progress", zap.Int64("backupedSize", backupedSize), zap.Int64("totalSize", totalSize), zap.Int32("progress", cloneBackup.Progress))

return cloneBackup
}

Expand Down

0 comments on commit 0e539a0

Please sign in to comment.