Skip to content

Commit

Permalink
recall repo maintenance history on restart
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <[email protected]>
  • Loading branch information
Lyndon-Li committed Jan 7, 2025
1 parent ceeab10 commit f041572
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 40 deletions.
16 changes: 13 additions & 3 deletions pkg/cmd/cli/repomantenance/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/vmware-tanzu/velero/internal/credentials"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
Expand Down Expand Up @@ -65,6 +68,8 @@ func (o *Options) Run(f velerocli.Factory) {
logger := logging.DefaultLogger(o.LogLevelFlag.Parse(), o.FormatFlag.Parse())
logger.SetOutput(os.Stdout)

ctrl.SetLogger(zap.New(zap.UseDevMode(true)))

time.Sleep(time.Minute)

pruneError := o.runRepoPrune(f, f.Namespace(), logger)
Expand Down Expand Up @@ -116,9 +121,9 @@ func (o *Options) initClient(f velerocli.Factory) (client.Client, error) {
return cli, nil
}

func initRepoManager(namespace string, cli client.Client, logger logrus.FieldLogger) (repomanager.Manager, error) {
func initRepoManager(namespace string, cli client.Client, kubeClient kubernetes.Interface, logger logrus.FieldLogger) (repomanager.Manager, error) {
// ensure the repo key secret is set up
if err := repokey.EnsureCommonRepositoryKey(cli, namespace); err != nil {
if err := repokey.EnsureCommonRepositoryKey(kubeClient.CoreV1(), namespace); err != nil {
return nil, errors.Wrap(err, "failed to ensure repository key")
}

Expand Down Expand Up @@ -155,7 +160,12 @@ func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger log
return err
}

manager, err := initRepoManager(namespace, cli, logger)
kubeClient, err := f.KubeClient()
if err != nil {
return err
}

manager, err := initRepoManager(namespace, cli, kubeClient, logger)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (s *server) checkNodeAgent() {

func (s *server) initRepoManager() error {
// ensure the repo key secret is set up
if err := repokey.EnsureCommonRepositoryKey(s.mgr.GetClient(), s.namespace); err != nil {
if err := repokey.EnsureCommonRepositoryKey(s.kubeClient.CoreV1(), s.namespace); err != nil {
return err
}

Expand Down
51 changes: 23 additions & 28 deletions pkg/controller/backup_repository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"slices"
"time"

"github.com/petar/GoLLRB/llrb"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -348,43 +349,45 @@ func (r *BackupRepoReconciler) recallMaintenance(ctx context.Context, req *veler
})
}

type maintenanceStatusWrapper struct {
status *velerov1api.BackupRepositoryMaintenanceStatus
}

func (w maintenanceStatusWrapper) Less(other llrb.Item) bool {
return w.status.StartTimestamp.Before(other.(maintenanceStatusWrapper).status.StartTimestamp)
}

func consolidateHistory(coming, cur []velerov1api.BackupRepositoryMaintenanceStatus) []velerov1api.BackupRepositoryMaintenanceStatus {
if len(coming) == 0 {
return nil
}

if isIdenticalHistory(coming, cur) {
if isIdenticalHistory(cur, coming) {
return nil
}

consolidator := llrb.New()
for i := range cur {
consolidator.ReplaceOrInsert(maintenanceStatusWrapper{&cur[i]})
}

for i := range coming {
consolidator.ReplaceOrInsert(maintenanceStatusWrapper{&coming[i]})
}

truncated := []velerov1api.BackupRepositoryMaintenanceStatus{}
i := len(cur) - 1
j := len(coming) - 1
for i >= 0 || j >= 0 {
for consolidator.Len() > 0 {
if len(truncated) == defaultMaintenanceStatusQueueLength {
break
}

if i >= 0 && j >= 0 {
if isEarlierMaintenanceStatus(cur[i], coming[j]) {
truncated = append(truncated, coming[j])
j--
} else {
truncated = append(truncated, cur[i])
i--
}
} else if i >= 0 {
truncated = append(truncated, cur[i])
i--
} else {
truncated = append(truncated, coming[j])
j--
}
item := consolidator.DeleteMax()
truncated = append(truncated, *item.(maintenanceStatusWrapper).status)
}

slices.Reverse(truncated)

if isIdenticalHistory(truncated, cur) {
if isIdenticalHistory(cur, truncated) {
return nil
}

Expand Down Expand Up @@ -421,10 +424,6 @@ func isIdenticalHistory(a, b []velerov1api.BackupRepositoryMaintenanceStatus) bo
return true
}

func isEarlierMaintenanceStatus(a, b velerov1api.BackupRepositoryMaintenanceStatus) bool {
return a.StartTimestamp.Before(b.StartTimestamp)
}

var funcStartMaintenanceJob = repository.StartMaintenanceJob
var funcWaitMaintenanceJobComplete = repository.WaitMaintenanceJobComplete

Expand All @@ -438,10 +437,6 @@ func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *vel

log.Info("Running maintenance on backup repository")

// prune failures should be displayed in the `.status.message` field but
// should not cause the repo to move to `NotReady`.
log.Debug("Pruning repo")

job, err := funcStartMaintenanceJob(r.Client, ctx, req, r.repoMaintenanceConfig, r.podResources, r.logLevel, r.logFormat, log)
if err != nil {
log.WithError(err).Warn("Starting repo maintenance failed")
Expand Down
Loading

0 comments on commit f041572

Please sign in to comment.