Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor pod volume context #8664

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type kubernetesBackupper struct {
podCommandExecutor podexec.PodCommandExecutor
podVolumeBackupperFactory podvolume.BackupperFactory
podVolumeTimeout time.Duration
podVolumeContext context.Context
defaultVolumesToFsBackup bool
clientPageSize int
uploaderType string
Expand Down Expand Up @@ -308,12 +309,13 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
}
}

ctx, cancelFunc := context.WithTimeout(context.Background(), podVolumeTimeout)
var cancelFunc context.CancelFunc
kb.podVolumeContext, cancelFunc = context.WithTimeout(context.Background(), podVolumeTimeout)
Lyndon-Li marked this conversation as resolved.
Show resolved Hide resolved
defer cancelFunc()

var podVolumeBackupper podvolume.Backupper
if kb.podVolumeBackupperFactory != nil {
podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, log, backupRequest.Backup, kb.uploaderType)
podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(kb.podVolumeContext, log, backupRequest.Backup, kb.uploaderType)
if err != nil {
log.WithError(errors.WithStack(err)).Debugf("Error from NewBackupper")
return errors.WithStack(err)
Expand Down Expand Up @@ -489,7 +491,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource
if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock {
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
backedUpGRs := kb.backupItemBlock(ctx, *itemBlock)
backedUpGRs := kb.backupItemBlock(*itemBlock)
for _, backedUpGR := range backedUpGRs {
backedUpGroupResources[backedUpGR] = true
}
Expand Down Expand Up @@ -661,7 +663,7 @@ func (kb *kubernetesBackupper) executeItemBlockActions(
}
}

func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource {
func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []schema.GroupResource {
// find pods in ItemBlock
// filter pods based on whether they still need to be backed up
// this list will be used to run pre/post hooks
Expand Down Expand Up @@ -703,7 +705,7 @@ func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock Ba
if len(postHookPods) > 0 {
itemBlock.Log.Debug("Executing post hooks")
itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1)
go kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods)
go kb.handleItemBlockPostHooks(itemBlock, postHookPods)
}

return grList
Expand Down Expand Up @@ -739,12 +741,12 @@ func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock
}

// The hooks cannot execute until the PVBs to be processed
func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
func (kb *kubernetesBackupper) handleItemBlockPostHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
log := itemBlock.Log
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()

// the post hooks will not execute until all PVBs of the item block pods are processed
if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil {
if err := kb.waitUntilPVBsProcessed(kb.podVolumeContext, log, itemBlock, hookPods); err != nil {
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
return
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type kubernetesRestorer struct {
namespaceClient corev1.NamespaceInterface
podVolumeRestorerFactory podvolume.RestorerFactory
podVolumeTimeout time.Duration
podVolumeContext go_context.Context
resourceTerminatingTimeout time.Duration
resourceTimeout time.Duration
resourcePriorities types.Priorities
Expand Down Expand Up @@ -249,12 +250,13 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
}
}

ctx, cancelFunc := go_context.WithTimeout(go_context.Background(), podVolumeTimeout)
var cancelFunc go_context.CancelFunc
kr.podVolumeContext, cancelFunc = go_context.WithTimeout(go_context.Background(), podVolumeTimeout)
defer cancelFunc()

var podVolumeRestorer podvolume.Restorer
if kr.podVolumeRestorerFactory != nil {
podVolumeRestorer, err = kr.podVolumeRestorerFactory.NewRestorer(ctx, req.Restore)
podVolumeRestorer, err = kr.podVolumeRestorerFactory.NewRestorer(kr.podVolumeContext, req.Restore)
if err != nil {
return results.Result{}, results.Result{Velero: []string{err.Error()}}
}
Expand Down
Loading