diff --git a/changelogs/unreleased/8580-Lyndon-Li b/changelogs/unreleased/8580-Lyndon-Li new file mode 100644 index 0000000000..87862b9529 --- /dev/null +++ b/changelogs/unreleased/8580-Lyndon-Li @@ -0,0 +1 @@ +Fix issue #7753, recall repo maintenance history on Velero server restart \ No newline at end of file diff --git a/go.mod b/go.mod index 1b72e62099..51e58a1a59 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/kubernetes-csi/external-snapshotter/client/v7 v7.0.0 github.com/onsi/ginkgo/v2 v2.19.0 github.com/onsi/gomega v1.33.1 + github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.5 github.com/robfig/cron/v3 v3.0.1 @@ -154,7 +155,6 @@ require ( github.com/natefinch/atomic v1.0.1 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/oklog/run v1.0.0 // indirect - github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect diff --git a/pkg/cmd/cli/repomantenance/maintenance.go b/pkg/cmd/cli/repomantenance/maintenance.go index e467c054a9..56a88de7ec 100644 --- a/pkg/cmd/cli/repomantenance/maintenance.go +++ b/pkg/cmd/cli/repomantenance/maintenance.go @@ -5,28 +5,35 @@ import ( "fmt" "os" "strings" + "time" + "github.com/bombsimon/logrusr/v3" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/pflag" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerocli "github.com/vmware-tanzu/velero/pkg/client" "github.com/vmware-tanzu/velero/pkg/repository" - "github.com/vmware-tanzu/velero/pkg/repository/provider" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" + + repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" + repomanager "github.com/vmware-tanzu/velero/pkg/repository/manager" ) type Options struct { RepoName string BackupStorageLocation string RepoType string + ResourceTimeout time.Duration LogLevelFlag *logging.LevelFlag FormatFlag *logging.FormatFlag } @@ -61,6 +68,8 @@ func (o *Options) Run(f velerocli.Factory) { logger := logging.DefaultLogger(o.LogLevelFlag.Parse(), o.FormatFlag.Parse()) logger.SetOutput(os.Stdout) + ctrl.SetLogger(logrusr.New(logger)) + pruneError := o.runRepoPrune(f, f.Namespace(), logger) defer func() { if pruneError != nil { @@ -110,12 +119,14 @@ func (o *Options) initClient(f velerocli.Factory) (client.Client, error) { return cli, nil } -func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger logrus.FieldLogger) error { - cli, err := o.initClient(f) - if err != nil { - return err +func initRepoManager(namespace string, cli client.Client, kubeClient kubernetes.Interface, logger logrus.FieldLogger) (repomanager.Manager, error) { + // ensure the repo key secret is set up + if err := repokey.EnsureCommonRepositoryKey(kubeClient.CoreV1(), namespace); err != nil { + return nil, errors.Wrap(err, "failed to ensure repository key") } + repoLocker := repository.NewRepoLocker() + credentialFileStore, err := credentials.NewNamespacedFileStore( cli, namespace, @@ -123,23 +134,38 @@ func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger log filesystem.NewFileSystem(), ) if err != nil { - return errors.Wrap(err, "failed to create namespaced file store") + return nil, errors.Wrap(err, "failed to create namespaced file store") } credentialSecretStore, err := credentials.NewNamespacedSecretStore(cli, namespace) if err != nil { - return errors.Wrap(err, "failed to create namespaced secret store") + return nil, errors.Wrap(err, "failed to create namespaced secret store") } - var repoProvider provider.Provider - if o.RepoType == velerov1api.BackupRepositoryTypeRestic { - repoProvider = provider.NewResticRepositoryProvider(credentialFileStore, filesystem.NewFileSystem(), logger) - } else { - repoProvider = provider.NewUnifiedRepoProvider( - credentials.CredentialGetter{ - FromFile: credentialFileStore, - FromSecret: credentialSecretStore, - }, o.RepoType, logger) + return repomanager.NewManager( + namespace, + cli, + repoLocker, + credentialFileStore, + credentialSecretStore, + logger, + ), nil +} + +func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger logrus.FieldLogger) error { + cli, err := o.initClient(f) + if err != nil { + return err + } + + kubeClient, err := f.KubeClient() + if err != nil { + return err + } + + manager, err := initRepoManager(namespace, cli, kubeClient, logger) + if err != nil { + return err } // backupRepository @@ -149,31 +175,14 @@ func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger log BackupLocation: o.BackupStorageLocation, RepositoryType: o.RepoType, }, true) - if err != nil { return errors.Wrap(err, "failed to get backup repository") } - // bsl - bsl := &velerov1api.BackupStorageLocation{} - err = cli.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: repo.Spec.BackupStorageLocation}, bsl) - if err != nil { - return errors.Wrap(err, "failed to get backup storage location") - } - - para := provider.RepoParam{ - BackupRepo: repo, - BackupLocation: bsl, - } - - err = repoProvider.BoostRepoConnect(context.Background(), para) - if err != nil { - return errors.Wrap(err, "failed to boost repo connect") - } - - err = repoProvider.PruneRepo(context.Background(), para) + err = manager.PruneRepo(repo) if err != nil { return errors.Wrap(err, "failed to prune repo") } + return nil } diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index d9f6960999..396970b92a 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -502,15 +502,9 @@ func (s *server) initRepoManager() error { s.namespace, s.mgr.GetClient(), s.repoLocker, - s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, - s.config.RepoMaintenanceJobConfig, - s.config.PodResources, - s.config.KeepLatestMaintenanceJobs, s.logger, - s.logLevel, - s.config.LogFormat, ) return nil @@ -731,9 +725,14 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string s.namespace, s.logger, s.mgr.GetClient(), + s.repoManager, s.config.RepoMaintenanceFrequency, s.config.BackupRepoConfig, - s.repoManager, + s.config.KeepLatestMaintenanceJobs, + s.config.RepoMaintenanceJobConfig, + s.config.PodResources, + s.logLevel, + s.config.LogFormat, ).SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", constant.ControllerBackupRepo) } diff --git a/pkg/controller/backup_repository_controller.go b/pkg/controller/backup_repository_controller.go index d41f547796..fe59529d48 100644 --- a/pkg/controller/backup_repository_controller.go +++ b/pkg/controller/backup_repository_controller.go @@ -22,8 +22,10 @@ import ( "encoding/json" "fmt" "reflect" + "slices" "time" + "github.com/petar/GoLLRB/llrb" "github.com/pkg/errors" "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -41,8 +43,10 @@ import ( "github.com/vmware-tanzu/velero/pkg/constant" "github.com/vmware-tanzu/velero/pkg/label" repoconfig "github.com/vmware-tanzu/velero/pkg/repository/config" + "github.com/vmware-tanzu/velero/pkg/repository/maintenance" repomanager "github.com/vmware-tanzu/velero/pkg/repository/manager" "github.com/vmware-tanzu/velero/pkg/util/kube" + "github.com/vmware-tanzu/velero/pkg/util/logging" ) const ( @@ -53,16 +57,22 @@ const ( type BackupRepoReconciler struct { client.Client - namespace string - logger logrus.FieldLogger - clock clocks.WithTickerAndDelayedExecution - maintenanceFrequency time.Duration - backupRepoConfig string - repositoryManager repomanager.Manager + namespace string + logger logrus.FieldLogger + clock clocks.WithTickerAndDelayedExecution + maintenanceFrequency time.Duration + backupRepoConfig string + repositoryManager repomanager.Manager + keepLatestMaintenanceJobs int + repoMaintenanceConfig string + maintenanceJobResources kube.PodResources + logLevel logrus.Level + logFormat *logging.FormatFlag } -func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client, - maintenanceFrequency time.Duration, backupRepoConfig string, repositoryManager repomanager.Manager) *BackupRepoReconciler { +func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client, repositoryManager repomanager.Manager, + maintenanceFrequency time.Duration, backupRepoConfig string, keepLatestMaintenanceJobs int, repoMaintenanceConfig string, maintenanceJobResources kube.PodResources, + logLevel logrus.Level, logFormat *logging.FormatFlag) *BackupRepoReconciler { c := &BackupRepoReconciler{ client, namespace, @@ -71,6 +81,11 @@ func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client maintenanceFrequency, backupRepoConfig, repositoryManager, + keepLatestMaintenanceJobs, + repoMaintenanceConfig, + maintenanceJobResources, + logLevel, + logFormat, } return c @@ -206,7 +221,17 @@ func (r *BackupRepoReconciler) Reconcile(ctx context.Context, req ctrl.Request) } fallthrough case velerov1api.BackupRepositoryPhaseReady: - return ctrl.Result{}, r.runMaintenanceIfDue(ctx, backupRepo, log) + if err := r.recallMaintenance(ctx, backupRepo, log); err != nil { + return ctrl.Result{}, errors.Wrap(err, "error handling incomplete repo maintenance jobs") + } + + if err := r.runMaintenanceIfDue(ctx, backupRepo, log); err != nil { + return ctrl.Result{}, errors.Wrap(err, "error check and run repo maintenance jobs") + } + + if err := maintenance.DeleteOldJobs(r.Client, req.Name, r.keepLatestMaintenanceJobs); err != nil { + log.WithError(err).Warn("Failed to delete old maintenance jobs") + } } return ctrl.Result{}, nil @@ -299,6 +324,99 @@ func ensureRepo(repo *velerov1api.BackupRepository, repoManager repomanager.Mana return repoManager.PrepareRepo(repo) } +func (r *BackupRepoReconciler) recallMaintenance(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error { + history, err := maintenance.WaitAllJobsComplete(ctx, r.Client, req, defaultMaintenanceStatusQueueLength, log) + if err != nil { + return errors.Wrapf(err, "error waiting incomplete repo maintenance job for repo %s", req.Name) + } + + consolidated := consolidateHistory(history, req.Status.RecentMaintenance) + if consolidated == nil { + return nil + } + + lastMaintenanceTime := getLastMaintenanceTimeFromHistory(consolidated) + + log.Warn("Updating backup repository because of unrecorded histories") + + return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { + if lastMaintenanceTime.After(rr.Status.LastMaintenanceTime.Time) { + log.Warnf("Updating backup repository last maintenance time (%v) from history (%v)", rr.Status.LastMaintenanceTime.Time, lastMaintenanceTime.Time) + rr.Status.LastMaintenanceTime = lastMaintenanceTime + } + + rr.Status.RecentMaintenance = consolidated + }) +} + +type maintenanceStatusWrapper struct { + status *velerov1api.BackupRepositoryMaintenanceStatus +} + +func (w maintenanceStatusWrapper) Less(other llrb.Item) bool { + return w.status.StartTimestamp.Before(other.(maintenanceStatusWrapper).status.StartTimestamp) +} + +func consolidateHistory(coming, cur []velerov1api.BackupRepositoryMaintenanceStatus) []velerov1api.BackupRepositoryMaintenanceStatus { + if len(coming) == 0 { + return nil + } + + if slices.EqualFunc(cur, coming, func(a, b velerov1api.BackupRepositoryMaintenanceStatus) bool { + return a.StartTimestamp.Equal(b.StartTimestamp) + }) { + return nil + } + + consolidator := llrb.New() + for i := range cur { + consolidator.ReplaceOrInsert(maintenanceStatusWrapper{&cur[i]}) + } + + for i := range coming { + consolidator.ReplaceOrInsert(maintenanceStatusWrapper{&coming[i]}) + } + + truncated := []velerov1api.BackupRepositoryMaintenanceStatus{} + for consolidator.Len() > 0 { + if len(truncated) == defaultMaintenanceStatusQueueLength { + break + } + + item := consolidator.DeleteMax() + truncated = append(truncated, *item.(maintenanceStatusWrapper).status) + } + + slices.Reverse(truncated) + + if slices.EqualFunc(cur, truncated, func(a, b velerov1api.BackupRepositoryMaintenanceStatus) bool { + return a.StartTimestamp.Equal(b.StartTimestamp) + }) { + return nil + } + + return truncated +} + +func getLastMaintenanceTimeFromHistory(history []velerov1api.BackupRepositoryMaintenanceStatus) *metav1.Time { + time := history[0].CompleteTimestamp + + for i := range history { + if history[i].CompleteTimestamp == nil { + continue + } + + if time == nil || time.Before(history[i].CompleteTimestamp) { + time = history[i].CompleteTimestamp + } + } + + return time +} + +var funcStartMaintenanceJob = maintenance.StartNewJob +var funcWaitMaintenanceJobComplete = maintenance.WaitJobComplete + func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error { startTime := r.clock.Now() @@ -309,29 +427,39 @@ func (r *BackupRepoReconciler) runMaintenanceIfDue(ctx context.Context, req *vel log.Info("Running maintenance on backup repository") - // prune failures should be displayed in the `.status.message` field but - // should not cause the repo to move to `NotReady`. - log.Debug("Pruning repo") + job, err := funcStartMaintenanceJob(r.Client, ctx, req, r.repoMaintenanceConfig, r.maintenanceJobResources, r.logLevel, r.logFormat, log) + if err != nil { + log.WithError(err).Warn("Starting repo maintenance failed") + return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { + updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, &metav1.Time{Time: startTime}, nil, fmt.Sprintf("Failed to start maintenance job, err: %v", err)) + }) + } + + // when WaitMaintenanceJobComplete fails, the maintenance result will be left aside temporarily + // If the maintenenance still completes later, recallMaintenance recalls the left once and update LastMaintenanceTime and history + status, err := funcWaitMaintenanceJobComplete(r.Client, ctx, job, r.namespace, log) + if err != nil { + return errors.Wrapf(err, "error waiting repo maintenance completion status") + } - if err := r.repositoryManager.PruneRepo(req); err != nil { - log.WithError(err).Warn("error pruning repository") + if status.Result == velerov1api.BackupRepositoryMaintenanceFailed { + log.WithError(err).Warn("Pruning repository failed") return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { - updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, startTime, r.clock.Now(), err.Error()) + updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceFailed, status.StartTimestamp, status.CompleteTimestamp, status.Message) }) } return r.patchBackupRepository(ctx, req, func(rr *velerov1api.BackupRepository) { - completionTime := r.clock.Now() - rr.Status.LastMaintenanceTime = &metav1.Time{Time: completionTime} - updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceSucceeded, startTime, completionTime, "") + rr.Status.LastMaintenanceTime = &metav1.Time{Time: status.CompleteTimestamp.Time} + updateRepoMaintenanceHistory(rr, velerov1api.BackupRepositoryMaintenanceSucceeded, status.StartTimestamp, status.CompleteTimestamp, status.Message) }) } -func updateRepoMaintenanceHistory(repo *velerov1api.BackupRepository, result velerov1api.BackupRepositoryMaintenanceResult, startTime time.Time, completionTime time.Time, message string) { +func updateRepoMaintenanceHistory(repo *velerov1api.BackupRepository, result velerov1api.BackupRepositoryMaintenanceResult, startTime, completionTime *metav1.Time, message string) { latest := velerov1api.BackupRepositoryMaintenanceStatus{ Result: result, - StartTimestamp: &metav1.Time{Time: startTime}, - CompleteTimestamp: &metav1.Time{Time: completionTime}, + StartTimestamp: startTime, + CompleteTimestamp: completionTime, Message: message, } diff --git a/pkg/controller/backup_repository_controller_test.go b/pkg/controller/backup_repository_controller_test.go index 376b17ce84..cb763fb8f7 100644 --- a/pkg/controller/backup_repository_controller_test.go +++ b/pkg/controller/backup_repository_controller_test.go @@ -19,37 +19,51 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/repository/maintenance" repomokes "github.com/vmware-tanzu/velero/pkg/repository/mocks" repotypes "github.com/vmware-tanzu/velero/pkg/repository/types" velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/util/kube" + "github.com/vmware-tanzu/velero/pkg/util/logging" + "sigs.k8s.io/controller-runtime/pkg/client" clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" + + batchv1 "k8s.io/api/batch/v1" ) const testMaintenanceFrequency = 10 * time.Minute -func mockBackupRepoReconciler(t *testing.T, mockOn string, arg interface{}, ret interface{}) *BackupRepoReconciler { +func mockBackupRepoReconciler(t *testing.T, mockOn string, arg interface{}, ret ...interface{}) *BackupRepoReconciler { t.Helper() mgr := &repomokes.Manager{} if mockOn != "" { - mgr.On(mockOn, arg).Return(ret) + mgr.On(mockOn, arg).Return(ret...) } return NewBackupRepoReconciler( velerov1api.DefaultNamespace, velerotest.NewLogger(), velerotest.NewFakeControllerRuntimeClient(t), + mgr, testMaintenanceFrequency, "fake-repo-config", - mgr, + 3, + "", + kube.PodResources{}, + logrus.InfoLevel, + nil, ) } @@ -104,21 +118,280 @@ func TestCheckNotReadyRepo(t *testing.T) { assert.Equal(t, "s3:test.amazonaws.com/bucket/restic/volume-ns-1", rr.Spec.ResticIdentifier) } +func startMaintenanceJobFail(client.Client, context.Context, *velerov1api.BackupRepository, string, kube.PodResources, logrus.Level, *logging.FormatFlag, logrus.FieldLogger) (string, error) { + return "", errors.New("fake-start-error") +} + +func startMaintenanceJobSucceed(client.Client, context.Context, *velerov1api.BackupRepository, string, kube.PodResources, logrus.Level, *logging.FormatFlag, logrus.FieldLogger) (string, error) { + return "fake-job-name", nil +} + +func waitMaintenanceJobCompleteFail(client.Client, context.Context, string, string, logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) { + return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.New("fake-wait-error") +} + +func waitMaintenanceJobCompleteFunc(now time.Time, result velerov1api.BackupRepositoryMaintenanceResult, message string) func(client.Client, context.Context, string, string, logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) { + return func(client.Client, context.Context, string, string, logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) { + return velerov1api.BackupRepositoryMaintenanceStatus{ + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: result, + Message: message, + }, nil + } +} + +type fakeClock struct { + now time.Time +} + +func (f *fakeClock) After(time.Duration) <-chan time.Time { + return nil +} + +func (f *fakeClock) NewTicker(time.Duration) clock.Ticker { + return nil +} +func (f *fakeClock) NewTimer(time.Duration) clock.Timer { + return nil +} + +func (f *fakeClock) Now() time.Time { + return f.now +} + +func (f *fakeClock) Since(time.Time) time.Duration { + return 0 +} + +func (f *fakeClock) Sleep(time.Duration) {} + +func (f *fakeClock) Tick(time.Duration) <-chan time.Time { + return nil +} + +func (f *fakeClock) AfterFunc(time.Duration, func()) clock.Timer { + return nil +} + func TestRunMaintenanceIfDue(t *testing.T) { - rr := mockBackupRepositoryCR() - reconciler := mockBackupRepoReconciler(t, "PruneRepo", rr, nil) - err := reconciler.Client.Create(context.TODO(), rr) - assert.NoError(t, err) - lastTm := rr.Status.LastMaintenanceTime - err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger) - assert.NoError(t, err) - assert.NotEqual(t, rr.Status.LastMaintenanceTime, lastTm) + now := time.Now().Round(time.Second) - rr.Status.LastMaintenanceTime = &metav1.Time{Time: time.Now()} - lastTm = rr.Status.LastMaintenanceTime - err = reconciler.runMaintenanceIfDue(context.TODO(), rr, reconciler.logger) - assert.NoError(t, err) - assert.Equal(t, rr.Status.LastMaintenanceTime, lastTm) + tests := []struct { + name string + repo *velerov1api.BackupRepository + startJobFunc func(client.Client, context.Context, *velerov1api.BackupRepository, string, kube.PodResources, logrus.Level, *logging.FormatFlag, logrus.FieldLogger) (string, error) + waitJobFunc func(client.Client, context.Context, string, string, logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) + expectedMaintenanceTime time.Time + expectedHistory []velerov1api.BackupRepositoryMaintenanceStatus + expectedErr string + }{ + { + name: "not due", + repo: &velerov1api.BackupRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1api.DefaultNamespace, + Name: "repo", + }, + Spec: velerov1api.BackupRepositorySpec{ + MaintenanceFrequency: metav1.Duration{Duration: time.Hour}, + }, + Status: velerov1api.BackupRepositoryStatus{ + LastMaintenanceTime: &metav1.Time{Time: now}, + RecentMaintenance: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + }, + }, + }, + expectedMaintenanceTime: now, + expectedHistory: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + }, + }, + { + name: "start failed", + repo: &velerov1api.BackupRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1api.DefaultNamespace, + Name: "repo", + }, + Spec: velerov1api.BackupRepositorySpec{ + MaintenanceFrequency: metav1.Duration{Duration: time.Hour}, + }, + Status: velerov1api.BackupRepositoryStatus{ + LastMaintenanceTime: &metav1.Time{Time: now.Add(-time.Hour - time.Minute)}, + RecentMaintenance: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + }, + }, + }, + startJobFunc: startMaintenanceJobFail, + expectedMaintenanceTime: now.Add(-time.Hour - time.Minute), + expectedHistory: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + { + StartTimestamp: &metav1.Time{Time: now}, + Result: velerov1api.BackupRepositoryMaintenanceFailed, + Message: "Failed to start maintenance job, err: fake-start-error", + }, + }, + }, + { + name: "wait failed", + repo: &velerov1api.BackupRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1api.DefaultNamespace, + Name: "repo", + }, + Spec: velerov1api.BackupRepositorySpec{ + MaintenanceFrequency: metav1.Duration{Duration: time.Hour}, + }, + Status: velerov1api.BackupRepositoryStatus{ + LastMaintenanceTime: &metav1.Time{Time: now.Add(-time.Hour - time.Minute)}, + RecentMaintenance: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + }, + }, + }, + startJobFunc: startMaintenanceJobSucceed, + waitJobFunc: waitMaintenanceJobCompleteFail, + expectedErr: "error waiting repo maintenance completion status: fake-wait-error", + expectedMaintenanceTime: now.Add(-time.Hour - time.Minute), + expectedHistory: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + }, + }, + { + name: "maintenance failed", + repo: &velerov1api.BackupRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1api.DefaultNamespace, + Name: "repo", + }, + Spec: velerov1api.BackupRepositorySpec{ + MaintenanceFrequency: metav1.Duration{Duration: time.Hour}, + }, + Status: velerov1api.BackupRepositoryStatus{ + LastMaintenanceTime: &metav1.Time{Time: now.Add(-time.Hour - time.Minute)}, + RecentMaintenance: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + }, + }, + }, + startJobFunc: startMaintenanceJobSucceed, + waitJobFunc: waitMaintenanceJobCompleteFunc(now, velerov1api.BackupRepositoryMaintenanceFailed, "fake-maintenance-message"), + expectedMaintenanceTime: now.Add(-time.Hour - time.Minute), + expectedHistory: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + { + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceFailed, + Message: "fake-maintenance-message", + }, + }, + }, + { + name: "maintenance succeeded", + repo: &velerov1api.BackupRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: velerov1api.DefaultNamespace, + Name: "repo", + }, + Spec: velerov1api.BackupRepositorySpec{ + MaintenanceFrequency: metav1.Duration{Duration: time.Hour}, + }, + Status: velerov1api.BackupRepositoryStatus{ + LastMaintenanceTime: &metav1.Time{Time: now.Add(-time.Hour - time.Minute)}, + RecentMaintenance: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + }, + }, + }, + startJobFunc: startMaintenanceJobSucceed, + waitJobFunc: waitMaintenanceJobCompleteFunc(now, velerov1api.BackupRepositoryMaintenanceSucceeded, ""), + expectedMaintenanceTime: now.Add(time.Hour), + expectedHistory: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(-time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(-time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + { + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reconciler := mockBackupRepoReconciler(t, "", test.repo, nil) + reconciler.clock = &fakeClock{now} + err := reconciler.Client.Create(context.TODO(), test.repo) + assert.NoError(t, err) + + funcStartMaintenanceJob = test.startJobFunc + funcWaitMaintenanceJobComplete = test.waitJobFunc + + err = reconciler.runMaintenanceIfDue(context.TODO(), test.repo, velerotest.NewLogger()) + if test.expectedErr == "" { + assert.NoError(t, err) + } + + assert.Equal(t, test.expectedMaintenanceTime, test.repo.Status.LastMaintenanceTime.Time) + assert.Len(t, test.repo.Status.RecentMaintenance, len(test.expectedHistory)) + + for i := 0; i < len(test.expectedHistory); i++ { + assert.Equal(t, test.expectedHistory[i].StartTimestamp.Time, test.repo.Status.RecentMaintenance[i].StartTimestamp.Time) + if test.expectedHistory[i].CompleteTimestamp == nil { + assert.Nil(t, test.repo.Status.RecentMaintenance[i].CompleteTimestamp) + } else { + assert.Equal(t, test.expectedHistory[i].CompleteTimestamp.Time, test.repo.Status.RecentMaintenance[i].CompleteTimestamp.Time) + } + + assert.Equal(t, test.expectedHistory[i].Result, test.repo.Status.RecentMaintenance[i].Result) + assert.Equal(t, test.expectedHistory[i].Message, test.repo.Status.RecentMaintenance[i].Message) + } + }) + } } func TestInitializeRepo(t *testing.T) { @@ -248,9 +521,14 @@ func TestGetRepositoryMaintenanceFrequency(t *testing.T) { velerov1api.DefaultNamespace, velerotest.NewLogger(), velerotest.NewFakeControllerRuntimeClient(t), + &mgr, test.userDefinedFreq, "", - &mgr, + 3, + "", + kube.PodResources{}, + logrus.InfoLevel, + nil, ) freq := reconciler.getRepositoryMaintenanceFrequency(test.repo) @@ -377,7 +655,14 @@ func TestNeedInvalidBackupRepo(t *testing.T) { velerov1api.DefaultNamespace, velerotest.NewLogger(), velerotest.NewFakeControllerRuntimeClient(t), - time.Duration(0), "", nil) + nil, + time.Duration(0), + "", + 3, + "", + kube.PodResources{}, + logrus.InfoLevel, + nil) need := reconciler.needInvalidBackupRepo(test.oldBSL, test.newBSL) assert.Equal(t, test.expect, need) @@ -653,7 +938,7 @@ func TestUpdateRepoMaintenanceHistory(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - updateRepoMaintenanceHistory(test.backupRepo, test.result, standardTime, standardTime.Add(time.Hour), "fake-message-0") + updateRepoMaintenanceHistory(test.backupRepo, test.result, &metav1.Time{Time: standardTime}, &metav1.Time{Time: standardTime.Add(time.Hour)}, "fake-message-0") for at := range test.backupRepo.Status.RecentMaintenance { assert.Equal(t, test.expectedHistory[at].StartTimestamp.Time, test.backupRepo.Status.RecentMaintenance[at].StartTimestamp.Time) @@ -663,3 +948,467 @@ func TestUpdateRepoMaintenanceHistory(t *testing.T) { }) } } + +func TestRecallMaintenance(t *testing.T) { + now := time.Now().Round(time.Second) + + schemeFail := runtime.NewScheme() + velerov1api.AddToScheme(schemeFail) + + scheme := runtime.NewScheme() + batchv1.AddToScheme(scheme) + corev1.AddToScheme(scheme) + velerov1api.AddToScheme(scheme) + + jobSucceeded := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: velerov1api.DefaultNamespace, + Labels: map[string]string{maintenance.RepositoryNameLabel: "repo"}, + CreationTimestamp: metav1.Time{Time: now.Add(time.Hour)}, + }, + Status: batchv1.JobStatus{ + StartTime: &metav1.Time{Time: now.Add(time.Hour)}, + CompletionTime: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Succeeded: 1, + }, + } + + jobPodSucceeded := builder.ForPod(velerov1api.DefaultNamespace, "job1").Labels(map[string]string{"job-name": "job1"}).ContainerStatuses(&corev1.ContainerStatus{ + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{}, + }, + }).Result() + + tests := []struct { + name string + kubeClientObj []runtime.Object + runtimeScheme *runtime.Scheme + repoLastMatainTime metav1.Time + expectNewHistory []velerov1api.BackupRepositoryMaintenanceStatus + expectTimeUpdate *metav1.Time + expectedErr string + }{ + { + name: "wait completion error", + runtimeScheme: schemeFail, + expectedErr: "error waiting incomplete repo maintenance job for repo repo: error listing maintenance job for repo repo: no kind is registered for the type v1.JobList in scheme \"pkg/runtime/scheme.go:100\"", + }, + { + name: "no consolidate result", + runtimeScheme: scheme, + }, + { + name: "no update last time", + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + jobSucceeded, + jobPodSucceeded, + }, + repoLastMatainTime: metav1.Time{Time: now.Add(time.Hour * 5)}, + expectNewHistory: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + }, + }, + { + name: "update last time", + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + jobSucceeded, + jobPodSucceeded, + }, + expectNewHistory: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + }, + }, + expectTimeUpdate: &metav1.Time{Time: now.Add(time.Hour * 2)}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + r := mockBackupRepoReconciler(t, "", nil, nil) + + backupRepo := mockBackupRepositoryCR() + backupRepo.Status.LastMaintenanceTime = &test.repoLastMatainTime + + test.kubeClientObj = append(test.kubeClientObj, backupRepo) + + fakeClientBuilder := clientFake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(test.runtimeScheme) + fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build() + r.Client = fakeClient + + lastTm := backupRepo.Status.LastMaintenanceTime + + err := r.recallMaintenance(context.TODO(), backupRepo, velerotest.NewLogger()) + if test.expectedErr != "" { + assert.EqualError(t, err, test.expectedErr) + } else { + assert.NoError(t, err) + + if test.expectNewHistory == nil { + assert.Nil(t, backupRepo.Status.RecentMaintenance) + } else { + assert.Len(t, backupRepo.Status.RecentMaintenance, len(test.expectNewHistory)) + for i := 0; i < len(test.expectNewHistory); i++ { + assert.Equal(t, test.expectNewHistory[i].StartTimestamp.Time, backupRepo.Status.RecentMaintenance[i].StartTimestamp.Time) + assert.Equal(t, test.expectNewHistory[i].CompleteTimestamp.Time, backupRepo.Status.RecentMaintenance[i].CompleteTimestamp.Time) + assert.Equal(t, test.expectNewHistory[i].Result, backupRepo.Status.RecentMaintenance[i].Result) + assert.Equal(t, test.expectNewHistory[i].Message, backupRepo.Status.RecentMaintenance[i].Message) + } + } + + if test.expectTimeUpdate != nil { + assert.Equal(t, test.expectTimeUpdate.Time, backupRepo.Status.LastMaintenanceTime.Time) + } else { + assert.Equal(t, lastTm, backupRepo.Status.LastMaintenanceTime) + } + } + }) + } +} + +func TestConsolidateHistory(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + cur []velerov1api.BackupRepositoryMaintenanceStatus + coming []velerov1api.BackupRepositoryMaintenanceStatus + expected []velerov1api.BackupRepositoryMaintenanceStatus + }{ + { + name: "zero coming", + cur: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message", + }, + }, + expected: nil, + }, + { + name: "identical coming", + cur: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message", + }, + }, + coming: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + }, + }, + expected: nil, + }, + { + name: "less than limit", + cur: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Result: velerov1api.BackupRepositoryMaintenanceFailed, + Message: "fake-maintenance-message-2", + }, + }, + coming: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-3", + }, + }, + expected: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Result: velerov1api.BackupRepositoryMaintenanceFailed, + Message: "fake-maintenance-message-2", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-3", + }, + }, + }, + { + name: "more than limit", + cur: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Result: velerov1api.BackupRepositoryMaintenanceFailed, + Message: "fake-maintenance-message-2", + }, + }, + coming: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-3", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 4)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-4", + }, + }, + expected: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Result: velerov1api.BackupRepositoryMaintenanceFailed, + Message: "fake-maintenance-message-2", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-3", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 4)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-4", + }, + }, + }, + { + name: "more than limit 2", + cur: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Result: velerov1api.BackupRepositoryMaintenanceFailed, + Message: "fake-maintenance-message-2", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-3", + }, + }, + coming: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Result: velerov1api.BackupRepositoryMaintenanceFailed, + Message: "fake-maintenance-message-2", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-3", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 4)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-4", + }, + }, + expected: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Result: velerov1api.BackupRepositoryMaintenanceFailed, + Message: "fake-maintenance-message-2", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-3", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 4)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-4", + }, + }, + }, + { + name: "coming is not used", + cur: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-3", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 4)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-4", + }, + }, + coming: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + }, + }, + expected: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + consolidated := consolidateHistory(test.coming, test.cur) + if test.expected == nil { + assert.Nil(t, consolidated) + } else { + assert.Len(t, consolidated, len(test.expected)) + for i := 0; i < len(test.expected); i++ { + assert.Equal(t, *test.expected[i].StartTimestamp, *consolidated[i].StartTimestamp) + assert.Equal(t, *test.expected[i].CompleteTimestamp, *consolidated[i].CompleteTimestamp) + assert.Equal(t, test.expected[i].Result, consolidated[i].Result) + assert.Equal(t, test.expected[i].Message, consolidated[i].Message) + } + + assert.Nil(t, consolidateHistory(test.coming, consolidated)) + } + }) + } +} + +func TestGetLastMaintenanceTimeFromHistory(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + history []velerov1api.BackupRepositoryMaintenanceStatus + expected time.Time + }{ + { + name: "first one is nil", + history: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Result: velerov1api.BackupRepositoryMaintenanceFailed, + Message: "fake-maintenance-message-2", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-3", + }, + }, + expected: now.Add(time.Hour * 3), + }, + { + name: "another one is nil", + history: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceFailed, + Message: "fake-maintenance-message-2", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-3", + }, + }, + expected: now.Add(time.Hour * 3), + }, + { + name: "disordered", + history: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message-3", + }, + { + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + Message: "fake-maintenance-message", + }, + { + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + Result: velerov1api.BackupRepositoryMaintenanceFailed, + Message: "fake-maintenance-message-2", + }, + }, + expected: now.Add(time.Hour * 3), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + time := getLastMaintenanceTimeFromHistory(test.history) + assert.Equal(t, test.expected, time.Time) + }) + } +} diff --git a/pkg/repository/maintenance.go b/pkg/repository/maintenance.go deleted file mode 100644 index 5ea63979f0..0000000000 --- a/pkg/repository/maintenance.go +++ /dev/null @@ -1,252 +0,0 @@ -/* -Copyright the Velero contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package repository - -import ( - "context" - "encoding/json" - "fmt" - "sort" - "time" - - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "sigs.k8s.io/controller-runtime/pkg/client" - - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - "github.com/vmware-tanzu/velero/pkg/util/kube" -) - -const ( - RepositoryNameLabel = "velero.io/repo-name" - GlobalKeyForRepoMaintenanceJobCM = "global" -) - -type JobConfigs struct { - // LoadAffinities is the config for repository maintenance job load affinity. - LoadAffinities []*kube.LoadAffinity `json:"loadAffinity,omitempty"` - - // PodResources is the config for the CPU and memory resources setting. - PodResources *kube.PodResources `json:"podResources,omitempty"` -} - -func GenerateJobName(repo string) string { - millisecond := time.Now().UTC().UnixMilli() // millisecond - - jobName := fmt.Sprintf("%s-maintain-job-%d", repo, millisecond) - if len(jobName) > 63 { // k8s job name length limit - jobName = fmt.Sprintf("repo-maintain-job-%d", millisecond) - } - - return jobName -} - -// DeleteOldMaintenanceJobs deletes old maintenance jobs and keeps the latest N jobs -func DeleteOldMaintenanceJobs(cli client.Client, repo string, keep int) error { - // Get the maintenance job list by label - jobList := &batchv1.JobList{} - err := cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo})) - if err != nil { - return err - } - - // Delete old maintenance jobs - if len(jobList.Items) > keep { - sort.Slice(jobList.Items, func(i, j int) bool { - return jobList.Items[i].CreationTimestamp.Before(&jobList.Items[j].CreationTimestamp) - }) - for i := 0; i < len(jobList.Items)-keep; i++ { - err = cli.Delete(context.TODO(), &jobList.Items[i], client.PropagationPolicy(metav1.DeletePropagationBackground)) - if err != nil { - return err - } - } - } - - return nil -} - -func WaitForJobComplete(ctx context.Context, client client.Client, job *batchv1.Job) error { - return wait.PollUntilContextCancel(ctx, 1, true, func(ctx context.Context) (bool, error) { - err := client.Get(ctx, types.NamespacedName{Namespace: job.Namespace, Name: job.Name}, job) - if err != nil && !apierrors.IsNotFound(err) { - return false, err - } - - if job.Status.Succeeded > 0 { - return true, nil - } - - if job.Status.Failed > 0 { - return true, fmt.Errorf("maintenance job %s/%s failed", job.Namespace, job.Name) - } - return false, nil - }) -} - -func GetMaintenanceResultFromJob(cli client.Client, job *batchv1.Job) (string, error) { - // Get the maintenance job related pod by label selector - podList := &v1.PodList{} - err := cli.List(context.TODO(), podList, client.InNamespace(job.Namespace), client.MatchingLabels(map[string]string{"job-name": job.Name})) - if err != nil { - return "", err - } - - if len(podList.Items) == 0 { - return "", fmt.Errorf("no pod found for job %s", job.Name) - } - - // we only have one maintenance pod for the job - pod := podList.Items[0] - - statuses := pod.Status.ContainerStatuses - if len(statuses) == 0 { - return "", fmt.Errorf("no container statuses found for job %s", job.Name) - } - - // we only have one maintenance container - terminated := statuses[0].State.Terminated - if terminated == nil { - return "", fmt.Errorf("container for job %s is not terminated", job.Name) - } - - return terminated.Message, nil -} - -func GetLatestMaintenanceJob(cli client.Client, ns string) (*batchv1.Job, error) { - // Get the maintenance job list by label - jobList := &batchv1.JobList{} - err := cli.List(context.TODO(), jobList, &client.ListOptions{ - Namespace: ns, - }, - &client.HasLabels{RepositoryNameLabel}, - ) - - if err != nil { - return nil, err - } - - if len(jobList.Items) == 0 { - return nil, nil - } - - // Get the latest maintenance job - sort.Slice(jobList.Items, func(i, j int) bool { - return jobList.Items[i].CreationTimestamp.Time.After(jobList.Items[j].CreationTimestamp.Time) - }) - - return &jobList.Items[0], nil -} - -// GetMaintenanceJobConfig is called to get the Maintenance Job Config for the -// BackupRepository specified by the repo parameter. -// -// Params: -// -// ctx: the Go context used for controller-runtime client. -// client: the controller-runtime client. -// logger: the logger. -// veleroNamespace: the Velero-installed namespace. It's used to retrieve the BackupRepository. -// repoMaintenanceJobConfig: the repository maintenance job ConfigMap name. -// repo: the BackupRepository needs to run the maintenance Job. -func GetMaintenanceJobConfig( - ctx context.Context, - client client.Client, - logger logrus.FieldLogger, - veleroNamespace string, - repoMaintenanceJobConfig string, - repo *velerov1api.BackupRepository, -) (*JobConfigs, error) { - var cm v1.ConfigMap - if err := client.Get( - ctx, - types.NamespacedName{ - Namespace: veleroNamespace, - Name: repoMaintenanceJobConfig, - }, - &cm, - ); err != nil { - if apierrors.IsNotFound(err) { - return nil, nil - } else { - return nil, errors.Wrapf( - err, - "fail to get repo maintenance job configs %s", repoMaintenanceJobConfig) - } - } - - if cm.Data == nil { - return nil, errors.Errorf("data is not available in config map %s", repoMaintenanceJobConfig) - } - - // Generate the BackupRepository key. - // If using the BackupRepository name as the is more intuitive, - // but the BackupRepository generation is dynamic. We cannot assume - // they are ready when installing Velero. - // Instead we use the volume source namespace, BSL name, and the uploader - // type to represent the BackupRepository. The combination of those three - // keys can identify a unique BackupRepository. - repoJobConfigKey := repo.Spec.VolumeNamespace + "-" + - repo.Spec.BackupStorageLocation + "-" + repo.Spec.RepositoryType - - var result *JobConfigs - if _, ok := cm.Data[repoJobConfigKey]; ok { - logger.Debugf("Find the repo maintenance config %s for repo %s", repoJobConfigKey, repo.Name) - result = new(JobConfigs) - if err := json.Unmarshal([]byte(cm.Data[repoJobConfigKey]), result); err != nil { - return nil, errors.Wrapf( - err, - "fail to unmarshal configs from %s's key %s", - repoMaintenanceJobConfig, - repoJobConfigKey) - } - } - - if _, ok := cm.Data[GlobalKeyForRepoMaintenanceJobCM]; ok { - logger.Debugf("Find the global repo maintenance config for repo %s", repo.Name) - - if result == nil { - result = new(JobConfigs) - } - - globalResult := new(JobConfigs) - - if err := json.Unmarshal([]byte(cm.Data[GlobalKeyForRepoMaintenanceJobCM]), globalResult); err != nil { - return nil, errors.Wrapf( - err, - "fail to unmarshal configs from %s's key %s", - repoMaintenanceJobConfig, - GlobalKeyForRepoMaintenanceJobCM) - } - - if result.PodResources == nil && globalResult.PodResources != nil { - result.PodResources = globalResult.PodResources - } - - if len(result.LoadAffinities) == 0 { - result.LoadAffinities = globalResult.LoadAffinities - } - } - - return result, nil -} diff --git a/pkg/repository/maintenance/maintenance.go b/pkg/repository/maintenance/maintenance.go new file mode 100644 index 0000000000..9694dd4dfb --- /dev/null +++ b/pkg/repository/maintenance/maintenance.go @@ -0,0 +1,523 @@ +/* +Copyright the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package maintenance + +import ( + "context" + "encoding/json" + "fmt" + "math" + "sort" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/util/kube" + + appsv1 "k8s.io/api/apps/v1" + + veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero" + + "github.com/vmware-tanzu/velero/pkg/util/logging" +) + +const ( + RepositoryNameLabel = "velero.io/repo-name" + GlobalKeyForRepoMaintenanceJobCM = "global" +) + +type JobConfigs struct { + // LoadAffinities is the config for repository maintenance job load affinity. + LoadAffinities []*kube.LoadAffinity `json:"loadAffinity,omitempty"` + + // PodResources is the config for the CPU and memory resources setting. + PodResources *kube.PodResources `json:"podResources,omitempty"` +} + +func GenerateJobName(repo string) string { + millisecond := time.Now().UTC().UnixMilli() // millisecond + + jobName := fmt.Sprintf("%s-maintain-job-%d", repo, millisecond) + if len(jobName) > 63 { // k8s job name length limit + jobName = fmt.Sprintf("repo-maintain-job-%d", millisecond) + } + + return jobName +} + +// DeleteOldJobs deletes old maintenance jobs and keeps the latest N jobs +func DeleteOldJobs(cli client.Client, repo string, keep int) error { + // Get the maintenance job list by label + jobList := &batchv1.JobList{} + err := cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo})) + if err != nil { + return err + } + + // Delete old maintenance jobs + if len(jobList.Items) > keep { + sort.Slice(jobList.Items, func(i, j int) bool { + return jobList.Items[i].CreationTimestamp.Before(&jobList.Items[j].CreationTimestamp) + }) + for i := 0; i < len(jobList.Items)-keep; i++ { + err = cli.Delete(context.TODO(), &jobList.Items[i], client.PropagationPolicy(metav1.DeletePropagationBackground)) + if err != nil { + return err + } + } + } + + return nil +} + +var waitCompletionBackOff = wait.Backoff{ + Duration: time.Minute * 20, + Steps: math.MaxInt, + Factor: 2, + Cap: time.Hour * 12, +} + +// waitForJobComplete wait for completion of the specified job and update the latest job object +func waitForJobComplete(ctx context.Context, client client.Client, ns string, job string, logger logrus.FieldLogger) (*batchv1.Job, error) { + var ret *batchv1.Job + + backOff := waitCompletionBackOff + + startTime := time.Now() + nextCheckpoint := startTime.Add(backOff.Step()) + + err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { + updated := &batchv1.Job{} + err := client.Get(ctx, types.NamespacedName{Namespace: ns, Name: job}, updated) + if err != nil && !apierrors.IsNotFound(err) { + return false, err + } + + ret = updated + + if updated.Status.Succeeded > 0 { + return true, nil + } + + if updated.Status.Failed > 0 { + return true, nil + } + + now := time.Now() + if now.After(nextCheckpoint) { + logger.Warnf("Repo maintenance job %s has lasted %v minutes", job, now.Sub(startTime).Minutes()) + nextCheckpoint = now.Add(backOff.Step()) + } + + return false, nil + }) + + return ret, err +} + +func getResultFromJob(cli client.Client, job *batchv1.Job) (string, error) { + // Get the maintenance job related pod by label selector + podList := &v1.PodList{} + err := cli.List(context.TODO(), podList, client.InNamespace(job.Namespace), client.MatchingLabels(map[string]string{"job-name": job.Name})) + if err != nil { + return "", err + } + + if len(podList.Items) == 0 { + return "", fmt.Errorf("no pod found for job %s", job.Name) + } + + // we only have one maintenance pod for the job + pod := podList.Items[0] + + statuses := pod.Status.ContainerStatuses + if len(statuses) == 0 { + return "", fmt.Errorf("no container statuses found for job %s", job.Name) + } + + // we only have one maintenance container + terminated := statuses[0].State.Terminated + if terminated == nil { + return "", fmt.Errorf("container for job %s is not terminated", job.Name) + } + + return terminated.Message, nil +} + +// getJobConfig is called to get the Maintenance Job Config for the +// BackupRepository specified by the repo parameter. +// +// Params: +// +// ctx: the Go context used for controller-runtime client. +// client: the controller-runtime client. +// logger: the logger. +// veleroNamespace: the Velero-installed namespace. It's used to retrieve the BackupRepository. +// repoMaintenanceJobConfig: the repository maintenance job ConfigMap name. +// repo: the BackupRepository needs to run the maintenance Job. +func getJobConfig( + ctx context.Context, + client client.Client, + logger logrus.FieldLogger, + veleroNamespace string, + repoMaintenanceJobConfig string, + repo *velerov1api.BackupRepository, +) (*JobConfigs, error) { + var cm v1.ConfigMap + if err := client.Get( + ctx, + types.NamespacedName{ + Namespace: veleroNamespace, + Name: repoMaintenanceJobConfig, + }, + &cm, + ); err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } else { + return nil, errors.Wrapf( + err, + "fail to get repo maintenance job configs %s", repoMaintenanceJobConfig) + } + } + + if cm.Data == nil { + return nil, errors.Errorf("data is not available in config map %s", repoMaintenanceJobConfig) + } + + // Generate the BackupRepository key. + // If using the BackupRepository name as the is more intuitive, + // but the BackupRepository generation is dynamic. We cannot assume + // they are ready when installing Velero. + // Instead we use the volume source namespace, BSL name, and the uploader + // type to represent the BackupRepository. The combination of those three + // keys can identify a unique BackupRepository. + repoJobConfigKey := repo.Spec.VolumeNamespace + "-" + + repo.Spec.BackupStorageLocation + "-" + repo.Spec.RepositoryType + + var result *JobConfigs + if _, ok := cm.Data[repoJobConfigKey]; ok { + logger.Debugf("Find the repo maintenance config %s for repo %s", repoJobConfigKey, repo.Name) + result = new(JobConfigs) + if err := json.Unmarshal([]byte(cm.Data[repoJobConfigKey]), result); err != nil { + return nil, errors.Wrapf( + err, + "fail to unmarshal configs from %s's key %s", + repoMaintenanceJobConfig, + repoJobConfigKey) + } + } + + if _, ok := cm.Data[GlobalKeyForRepoMaintenanceJobCM]; ok { + logger.Debugf("Find the global repo maintenance config for repo %s", repo.Name) + + if result == nil { + result = new(JobConfigs) + } + + globalResult := new(JobConfigs) + + if err := json.Unmarshal([]byte(cm.Data[GlobalKeyForRepoMaintenanceJobCM]), globalResult); err != nil { + return nil, errors.Wrapf( + err, + "fail to unmarshal configs from %s's key %s", + repoMaintenanceJobConfig, + GlobalKeyForRepoMaintenanceJobCM) + } + + if result.PodResources == nil && globalResult.PodResources != nil { + result.PodResources = globalResult.PodResources + } + + if len(result.LoadAffinities) == 0 { + result.LoadAffinities = globalResult.LoadAffinities + } + } + + return result, nil +} + +// WaitJobComplete waits the completion of the specified maintenance job and return the BackupRepositoryMaintenanceStatus +func WaitJobComplete(cli client.Client, ctx context.Context, jobName, ns string, logger logrus.FieldLogger) (velerov1api.BackupRepositoryMaintenanceStatus, error) { + log := logger.WithField("job name", jobName) + + maintenanceJob, err := waitForJobComplete(ctx, cli, ns, jobName, logger) + if err != nil { + return velerov1api.BackupRepositoryMaintenanceStatus{}, errors.Wrap(err, "error to wait for maintenance job complete") + } + + log.Infof("Maintenance repo complete, succeeded %v, failed %v", maintenanceJob.Status.Succeeded, maintenanceJob.Status.Failed) + + result := "" + if maintenanceJob.Status.Failed > 0 { + if r, err := getResultFromJob(cli, maintenanceJob); err != nil { + log.WithError(err).Warn("Failed to get maintenance job result") + result = "Repo maintenance failed but result is not retrieveable" + } else { + result = r + } + } + + return composeStatusFromJob(maintenanceJob, result), nil +} + +// WaitAllJobsComplete checks all the incomplete maintenance jobs of the specified repo and wait for them to complete, +// and then return the maintenance jobs' status in the range of limit +func WaitAllJobsComplete(ctx context.Context, cli client.Client, repo *velerov1api.BackupRepository, limit int, log logrus.FieldLogger) ([]velerov1api.BackupRepositoryMaintenanceStatus, error) { + jobList := &batchv1.JobList{} + err := cli.List(context.TODO(), jobList, &client.ListOptions{ + Namespace: repo.Namespace, + }, + client.MatchingLabels(map[string]string{RepositoryNameLabel: repo.Name}), + ) + + if err != nil { + return nil, errors.Wrapf(err, "error listing maintenance job for repo %s", repo.Name) + } + + if len(jobList.Items) == 0 { + return nil, nil + } + + sort.Slice(jobList.Items, func(i, j int) bool { + return jobList.Items[i].CreationTimestamp.Time.Before(jobList.Items[j].CreationTimestamp.Time) + }) + + history := []velerov1api.BackupRepositoryMaintenanceStatus{} + + startPos := len(jobList.Items) - limit + if startPos < 0 { + startPos = 0 + } + + for i := startPos; i < len(jobList.Items); i++ { + job := &jobList.Items[i] + + if job.Status.Succeeded == 0 && job.Status.Failed == 0 { + log.Infof("Waiting for maintenance job %s to complete", job.Name) + + updated, err := waitForJobComplete(ctx, cli, job.Namespace, job.Name, log) + if err != nil { + return nil, errors.Wrapf(err, "error waiting maintenance job[%s] complete", job.Name) + } + + job = updated + } + + message := "" + if job.Status.Failed > 0 { + if msg, err := getResultFromJob(cli, job); err != nil { + log.WithError(err).Warnf("Failed to get result of maintenance job %s", job.Name) + message = "Repo maintenance failed but result is not retrieveable" + } else { + message = msg + } + } + + history = append(history, composeStatusFromJob(job, message)) + } + + return history, nil +} + +// StartNewJob creates a new maintenance job +func StartNewJob(cli client.Client, ctx context.Context, repo *velerov1api.BackupRepository, repoMaintenanceJobConfig string, + podResources kube.PodResources, logLevel logrus.Level, logFormat *logging.FormatFlag, logger logrus.FieldLogger) (string, error) { + bsl := &velerov1api.BackupStorageLocation{} + if err := cli.Get(ctx, client.ObjectKey{Namespace: repo.Namespace, Name: repo.Spec.BackupStorageLocation}, bsl); err != nil { + return "", errors.WithStack(err) + } + + log := logger.WithFields(logrus.Fields{ + "BSL name": bsl.Name, + "repo type": repo.Spec.RepositoryType, + "repo name": repo.Name, + "repo UID": repo.UID, + }) + + jobConfig, err := getJobConfig( + ctx, + cli, + log, + repo.Namespace, + repoMaintenanceJobConfig, + repo, + ) + if err != nil { + log.Warnf("Fail to find the ConfigMap %s to build maintenance job with error: %s. Use default value.", + repo.Namespace+"/"+repoMaintenanceJobConfig, + err.Error(), + ) + } + + log.Info("Starting maintenance repo") + + maintenanceJob, err := buildJob(cli, ctx, repo, bsl.Name, jobConfig, podResources, logLevel, logFormat) + if err != nil { + return "", errors.Wrap(err, "error to build maintenance job") + } + + log = log.WithField("job", fmt.Sprintf("%s/%s", maintenanceJob.Namespace, maintenanceJob.Name)) + + if err := cli.Create(context.TODO(), maintenanceJob); err != nil { + return "", errors.Wrap(err, "error to create maintenance job") + } + + log.Info("Repo maintenance job started") + + return maintenanceJob.Name, nil +} + +func buildJob(cli client.Client, ctx context.Context, repo *velerov1api.BackupRepository, bslName string, config *JobConfigs, + podResources kube.PodResources, logLevel logrus.Level, logFormat *logging.FormatFlag) (*batchv1.Job, error) { + // Get the Velero server deployment + deployment := &appsv1.Deployment{} + err := cli.Get(ctx, types.NamespacedName{Name: "velero", Namespace: repo.Namespace}, deployment) + if err != nil { + return nil, err + } + + // Get the environment variables from the Velero server deployment + envVars := veleroutil.GetEnvVarsFromVeleroServer(deployment) + + // Get the referenced storage from the Velero server deployment + envFromSources := veleroutil.GetEnvFromSourcesFromVeleroServer(deployment) + + // Get the volume mounts from the Velero server deployment + volumeMounts := veleroutil.GetVolumeMountsFromVeleroServer(deployment) + + // Get the volumes from the Velero server deployment + volumes := veleroutil.GetVolumesFromVeleroServer(deployment) + + // Get the service account from the Velero server deployment + serviceAccount := veleroutil.GetServiceAccountFromVeleroServer(deployment) + + // Get image + image := veleroutil.GetVeleroServerImage(deployment) + + // Set resource limits and requests + cpuRequest := podResources.CPURequest + memRequest := podResources.MemoryRequest + cpuLimit := podResources.CPULimit + memLimit := podResources.MemoryLimit + if config != nil && config.PodResources != nil { + cpuRequest = config.PodResources.CPURequest + memRequest = config.PodResources.MemoryRequest + cpuLimit = config.PodResources.CPULimit + memLimit = config.PodResources.MemoryLimit + } + resources, err := kube.ParseResourceRequirements(cpuRequest, memRequest, cpuLimit, memLimit) + if err != nil { + return nil, errors.Wrap(err, "failed to parse resource requirements for maintenance job") + } + + // Set arguments + args := []string{"repo-maintenance"} + args = append(args, fmt.Sprintf("--repo-name=%s", repo.Spec.VolumeNamespace)) + args = append(args, fmt.Sprintf("--repo-type=%s", repo.Spec.RepositoryType)) + args = append(args, fmt.Sprintf("--backup-storage-location=%s", bslName)) + args = append(args, fmt.Sprintf("--log-level=%s", logLevel.String())) + args = append(args, fmt.Sprintf("--log-format=%s", logFormat.String())) + + // build the maintenance job + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: GenerateJobName(repo.Name), + Namespace: repo.Namespace, + Labels: map[string]string{ + RepositoryNameLabel: repo.Name, + }, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: new(int32), // Never retry + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "velero-repo-maintenance-pod", + Labels: map[string]string{ + RepositoryNameLabel: repo.Name, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "velero-repo-maintenance-container", + Image: image, + Command: []string{ + "/velero", + }, + Args: args, + ImagePullPolicy: v1.PullIfNotPresent, + Env: envVars, + EnvFrom: envFromSources, + VolumeMounts: volumeMounts, + Resources: resources, + }, + }, + RestartPolicy: v1.RestartPolicyNever, + Volumes: volumes, + ServiceAccountName: serviceAccount, + }, + }, + }, + } + + if config != nil && len(config.LoadAffinities) > 0 { + affinity := kube.ToSystemAffinity(config.LoadAffinities) + job.Spec.Template.Spec.Affinity = affinity + } + + if tolerations := veleroutil.GetTolerationsFromVeleroServer(deployment); tolerations != nil { + job.Spec.Template.Spec.Tolerations = tolerations + } + + if nodeSelector := veleroutil.GetNodeSelectorFromVeleroServer(deployment); nodeSelector != nil { + job.Spec.Template.Spec.NodeSelector = nodeSelector + } + + if labels := veleroutil.GetVeleroServerLables(deployment); len(labels) > 0 { + job.Spec.Template.Labels = labels + } + + if annotations := veleroutil.GetVeleroServerAnnotations(deployment); len(annotations) > 0 { + job.Spec.Template.Annotations = annotations + } + + return job, nil +} + +func composeStatusFromJob(job *batchv1.Job, message string) velerov1api.BackupRepositoryMaintenanceStatus { + result := velerov1api.BackupRepositoryMaintenanceSucceeded + if job.Status.Failed > 0 { + result = velerov1api.BackupRepositoryMaintenanceFailed + } + + return velerov1api.BackupRepositoryMaintenanceStatus{ + Result: result, + StartTimestamp: &metav1.Time{Time: job.CreationTimestamp.Time}, + CompleteTimestamp: &metav1.Time{Time: job.Status.CompletionTime.Time}, + Message: message, + } +} diff --git a/pkg/repository/maintenance/maintenance_test.go b/pkg/repository/maintenance/maintenance_test.go new file mode 100644 index 0000000000..bfe700ce86 --- /dev/null +++ b/pkg/repository/maintenance/maintenance_test.go @@ -0,0 +1,1010 @@ +/* +Copyright the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package maintenance + +import ( + "context" + "fmt" + "math" + "strings" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/repository/provider" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/util/kube" + "github.com/vmware-tanzu/velero/pkg/util/logging" + + appsv1 "k8s.io/api/apps/v1" +) + +func TestGenerateJobName1(t *testing.T) { + testCases := []struct { + repo string + expectedStart string + }{ + { + repo: "example", + expectedStart: "example-maintain-job-", + }, + { + repo: strings.Repeat("a", 60), + expectedStart: "repo-maintain-job-", + }, + } + + for _, tc := range testCases { + t.Run(tc.repo, func(t *testing.T) { + // Call the function to test + jobName := GenerateJobName(tc.repo) + + // Check if the generated job name starts with the expected prefix + if !strings.HasPrefix(jobName, tc.expectedStart) { + t.Errorf("generated job name does not start with expected prefix") + } + + // Check if the length of the generated job name exceeds the Kubernetes limit + if len(jobName) > 63 { + t.Errorf("generated job name exceeds Kubernetes limit") + } + }) + } +} +func TestDeleteOldJobs(t *testing.T) { + // Set up test repo and keep value + repo := "test-repo" + keep := 2 + + // Create some maintenance jobs for testing + var objs []client.Object + // Create a newer job + newerJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: "default", + Labels: map[string]string{RepositoryNameLabel: repo}, + }, + Spec: batchv1.JobSpec{}, + } + objs = append(objs, newerJob) + // Create older jobs + for i := 2; i <= 3; i++ { + olderJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("job%d", i), + Namespace: "default", + Labels: map[string]string{RepositoryNameLabel: repo}, + CreationTimestamp: metav1.Time{ + Time: metav1.Now().Add(time.Duration(-24*i) * time.Hour), + }, + }, + Spec: batchv1.JobSpec{}, + } + objs = append(objs, olderJob) + } + // Create a fake Kubernetes client + scheme := runtime.NewScheme() + _ = batchv1.AddToScheme(scheme) + cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() + + // Call the function + err := DeleteOldJobs(cli, repo, keep) + assert.NoError(t, err) + + // Get the remaining jobs + jobList := &batchv1.JobList{} + err = cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo})) + assert.NoError(t, err) + + // We expect the number of jobs to be equal to 'keep' + assert.Len(t, jobList.Items, keep) + + // We expect that the oldest jobs were deleted + // Job3 should not be present in the remaining list + assert.NotContains(t, jobList.Items, objs[2]) + + // Job2 should also not be present in the remaining list + assert.NotContains(t, jobList.Items, objs[1]) +} + +func TestWaitForJobComplete(t *testing.T) { + // Set up test job + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "default", + }, + Status: batchv1.JobStatus{}, + } + + schemeFail := runtime.NewScheme() + + scheme := runtime.NewScheme() + batchv1.AddToScheme(scheme) + + waitCompletionBackOff1 := wait.Backoff{ + Duration: time.Second, + Steps: math.MaxInt, + Factor: 2, + Cap: time.Second * 12, + } + + waitCompletionBackOff2 := wait.Backoff{ + Duration: time.Second, + Steps: math.MaxInt, + Factor: 2, + Cap: time.Second * 2, + } + + // Define test cases + tests := []struct { + description string // Test case description + kubeClientObj []runtime.Object + runtimeScheme *runtime.Scheme + jobStatus batchv1.JobStatus // Job status to set for the test + logBackOff wait.Backoff + updateAfter time.Duration + expectedLogs int + expectError bool // Whether an error is expected + }{ + { + description: "wait error", + runtimeScheme: schemeFail, + expectError: true, + }, + { + description: "Job Succeeded", + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + job, + }, + jobStatus: batchv1.JobStatus{ + Succeeded: 1, + }, + expectError: false, + }, + { + description: "Job Failed", + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + job, + }, + jobStatus: batchv1.JobStatus{ + Failed: 1, + }, + expectError: false, + }, + { + description: "Log backoff not to cap", + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + job, + }, + logBackOff: waitCompletionBackOff1, + updateAfter: time.Second * 8, + expectedLogs: 3, + }, + { + description: "Log backoff to cap", + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + job, + }, + logBackOff: waitCompletionBackOff2, + updateAfter: time.Second * 6, + expectedLogs: 3, + }, + } + + // Run tests + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + // Set the job status + job.Status = tc.jobStatus + // Create a fake Kubernetes client + fakeClientBuilder := fake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(tc.runtimeScheme) + fakeClient := fakeClientBuilder.WithRuntimeObjects(tc.kubeClientObj...).Build() + + buffer := []string{} + logger := velerotest.NewMultipleLogger(&buffer) + + waitCompletionBackOff = tc.logBackOff + + if tc.updateAfter != 0 { + go func() { + time.Sleep(tc.updateAfter) + + original := job.DeepCopy() + job.Status.Succeeded = 1 + err := fakeClient.Status().Patch(context.Background(), job, client.MergeFrom(original)) + require.NoError(t, err) + }() + } + + // Call the function + _, err := waitForJobComplete(context.Background(), fakeClient, job.Namespace, job.Name, logger) + + // Check if the error matches the expectation + if tc.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + assert.LessOrEqual(t, len(buffer), tc.expectedLogs) + }) + } +} + +func TestGetResultFromJob(t *testing.T) { + // Set up test job + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "default", + }, + } + + // Set up test pod with no status + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Labels: map[string]string{"job-name": job.Name}, + }, + } + + // Create a fake Kubernetes client + cli := fake.NewClientBuilder().WithObjects(job, pod).Build() + + // test an error should be returned + result, err := getResultFromJob(cli, job) + assert.Error(t, err) + assert.Equal(t, "", result) + + // Set a non-terminated container status to the pod + pod.Status = v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{}, + }, + }, + } + + // Test an error should be returned + cli = fake.NewClientBuilder().WithObjects(job, pod).Build() + result, err = getResultFromJob(cli, job) + assert.Error(t, err) + assert.Equal(t, "", result) + + // Set a terminated container status to the pod + pod.Status = v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + Message: "test message", + }, + }, + }, + }, + } + + // This call should return the termination message with no error + cli = fake.NewClientBuilder().WithObjects(job, pod).Build() + result, err = getResultFromJob(cli, job) + assert.NoError(t, err) + assert.Equal(t, "test message", result) +} + +func TestGetJobConfig(t *testing.T) { + ctx := context.Background() + logger := logrus.New() + veleroNamespace := "velero" + repoMaintenanceJobConfig := "repo-maintenance-job-config" + repo := &velerov1api.BackupRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: veleroNamespace, + Name: repoMaintenanceJobConfig, + }, + Spec: velerov1api.BackupRepositorySpec{ + BackupStorageLocation: "default", + RepositoryType: "kopia", + VolumeNamespace: "test", + }, + } + + testCases := []struct { + name string + repoJobConfig *v1.ConfigMap + expectedConfig *JobConfigs + expectedError error + }{ + { + name: "Config not exist", + expectedConfig: nil, + expectedError: nil, + }, + { + name: "Invalid JSON", + repoJobConfig: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: veleroNamespace, + Name: repoMaintenanceJobConfig, + }, + Data: map[string]string{ + "test-default-kopia": "{\"cpuRequest:\"100m\"}", + }, + }, + expectedConfig: nil, + expectedError: fmt.Errorf("fail to unmarshal configs from %s", repoMaintenanceJobConfig), + }, + { + name: "Find config specific for BackupRepository", + repoJobConfig: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: veleroNamespace, + Name: repoMaintenanceJobConfig, + }, + Data: map[string]string{ + "test-default-kopia": "{\"podResources\":{\"cpuRequest\":\"100m\",\"cpuLimit\":\"200m\",\"memoryRequest\":\"100Mi\",\"memoryLimit\":\"200Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"e2\"]}]}}]}", + }, + }, + expectedConfig: &JobConfigs{ + PodResources: &kube.PodResources{ + CPURequest: "100m", + CPULimit: "200m", + MemoryRequest: "100Mi", + MemoryLimit: "200Mi", + }, + LoadAffinities: []*kube.LoadAffinity{ + { + NodeSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "cloud.google.com/machine-family", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"e2"}, + }, + }, + }, + }, + }, + }, + expectedError: nil, + }, + { + name: "Find config specific for global", + repoJobConfig: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: veleroNamespace, + Name: repoMaintenanceJobConfig, + }, + Data: map[string]string{ + GlobalKeyForRepoMaintenanceJobCM: "{\"podResources\":{\"cpuRequest\":\"50m\",\"cpuLimit\":\"100m\",\"memoryRequest\":\"50Mi\",\"memoryLimit\":\"100Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"n2\"]}]}}]}", + }, + }, + expectedConfig: &JobConfigs{ + PodResources: &kube.PodResources{ + CPURequest: "50m", + CPULimit: "100m", + MemoryRequest: "50Mi", + MemoryLimit: "100Mi", + }, + LoadAffinities: []*kube.LoadAffinity{ + { + NodeSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "cloud.google.com/machine-family", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"n2"}, + }, + }, + }, + }, + }, + }, + expectedError: nil, + }, + { + name: "Specific config supersede global config", + repoJobConfig: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: veleroNamespace, + Name: repoMaintenanceJobConfig, + }, + Data: map[string]string{ + GlobalKeyForRepoMaintenanceJobCM: "{\"podResources\":{\"cpuRequest\":\"50m\",\"cpuLimit\":\"100m\",\"memoryRequest\":\"50Mi\",\"memoryLimit\":\"100Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"n2\"]}]}}]}", + "test-default-kopia": "{\"podResources\":{\"cpuRequest\":\"100m\",\"cpuLimit\":\"200m\",\"memoryRequest\":\"100Mi\",\"memoryLimit\":\"200Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"e2\"]}]}}]}", + }, + }, + expectedConfig: &JobConfigs{ + PodResources: &kube.PodResources{ + CPURequest: "100m", + CPULimit: "200m", + MemoryRequest: "100Mi", + MemoryLimit: "200Mi", + }, + LoadAffinities: []*kube.LoadAffinity{ + { + NodeSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "cloud.google.com/machine-family", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"e2"}, + }, + }, + }, + }, + }, + }, + expectedError: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var fakeClient client.Client + if tc.repoJobConfig != nil { + fakeClient = velerotest.NewFakeControllerRuntimeClient(t, tc.repoJobConfig) + } else { + fakeClient = velerotest.NewFakeControllerRuntimeClient(t) + } + + jobConfig, err := getJobConfig( + ctx, + fakeClient, + logger, + veleroNamespace, + repoMaintenanceJobConfig, + repo, + ) + + if tc.expectedError != nil { + require.ErrorContains(t, err, tc.expectedError.Error()) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.expectedConfig, jobConfig) + }) + } +} + +func TestWaitAllJobsComplete(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + + veleroNamespace := "velero" + repo := &velerov1api.BackupRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: veleroNamespace, + Name: "fake-repo", + }, + Spec: velerov1api.BackupRepositorySpec{ + BackupStorageLocation: "default", + RepositoryType: "kopia", + VolumeNamespace: "test", + }, + } + + now := time.Now().Round(time.Second) + + jobOtherLabel := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: veleroNamespace, + Labels: map[string]string{RepositoryNameLabel: "other-repo"}, + CreationTimestamp: metav1.Time{Time: now}, + }, + } + + jobIncomplete := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: veleroNamespace, + Labels: map[string]string{RepositoryNameLabel: "fake-repo"}, + CreationTimestamp: metav1.Time{Time: now}, + }, + } + + jobSucceeded1 := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: veleroNamespace, + Labels: map[string]string{RepositoryNameLabel: "fake-repo"}, + CreationTimestamp: metav1.Time{Time: now}, + }, + Status: batchv1.JobStatus{ + StartTime: &metav1.Time{Time: now}, + CompletionTime: &metav1.Time{Time: now.Add(time.Hour)}, + Succeeded: 1, + }, + } + + jobPodSucceeded1 := builder.ForPod(veleroNamespace, "job1").Labels(map[string]string{"job-name": "job1"}).ContainerStatuses(&v1.ContainerStatus{ + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }).Result() + + jobFailed1 := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job2", + Namespace: veleroNamespace, + Labels: map[string]string{RepositoryNameLabel: "fake-repo"}, + CreationTimestamp: metav1.Time{Time: now.Add(time.Hour)}, + }, + Status: batchv1.JobStatus{ + StartTime: &metav1.Time{Time: now.Add(time.Hour)}, + CompletionTime: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Failed: 1, + }, + } + + jobPodFailed1 := builder.ForPod(veleroNamespace, "job2").Labels(map[string]string{"job-name": "job2"}).ContainerStatuses(&v1.ContainerStatus{ + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + Message: "fake-message-2", + }, + }, + }).Result() + + jobSucceeded2 := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job3", + Namespace: veleroNamespace, + Labels: map[string]string{RepositoryNameLabel: "fake-repo"}, + CreationTimestamp: metav1.Time{Time: now.Add(time.Hour * 2)}, + }, + Status: batchv1.JobStatus{ + StartTime: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompletionTime: &metav1.Time{Time: now.Add(time.Hour * 3)}, + Succeeded: 1, + }, + } + + jobPodSucceeded2 := builder.ForPod(veleroNamespace, "job3").Labels(map[string]string{"job-name": "job3"}).ContainerStatuses(&v1.ContainerStatus{ + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }).Result() + + jobSucceeded3 := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job4", + Namespace: veleroNamespace, + Labels: map[string]string{RepositoryNameLabel: "fake-repo"}, + CreationTimestamp: metav1.Time{Time: now.Add(time.Hour * 3)}, + }, + Status: batchv1.JobStatus{ + StartTime: &metav1.Time{Time: now.Add(time.Hour * 3)}, + CompletionTime: &metav1.Time{Time: now.Add(time.Hour * 4)}, + Succeeded: 1, + }, + } + + jobPodSucceeded3 := builder.ForPod(veleroNamespace, "job4").Labels(map[string]string{"job-name": "job4"}).ContainerStatuses(&v1.ContainerStatus{ + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }).Result() + + schemeFail := runtime.NewScheme() + + scheme := runtime.NewScheme() + batchv1.AddToScheme(scheme) + v1.AddToScheme(scheme) + + testCases := []struct { + name string + ctx context.Context + kubeClientObj []runtime.Object + runtimeScheme *runtime.Scheme + expectedStatus []velerov1api.BackupRepositoryMaintenanceStatus + expectedError string + }{ + { + name: "list job error", + runtimeScheme: schemeFail, + expectedError: "error listing maintenance job for repo fake-repo: no kind is registered for the type v1.JobList in scheme \"pkg/runtime/scheme.go:100\"", + }, + { + name: "job not exist", + runtimeScheme: scheme, + }, + { + name: "no matching job", + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + jobOtherLabel, + }, + }, + { + name: "wait complete error", + ctx: ctx, + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + jobIncomplete, + }, + expectedError: "error waiting maintenance job[job1] complete: context deadline exceeded", + }, + { + name: "get result error on succeeded job", + ctx: context.TODO(), + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + jobSucceeded1, + }, + expectedStatus: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + }, + }, + }, + { + name: "get result error on failed job", + ctx: context.TODO(), + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + jobFailed1, + }, + expectedStatus: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + Result: velerov1api.BackupRepositoryMaintenanceFailed, + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Message: "Repo maintenance failed but result is not retrieveable", + }, + }, + }, + { + name: "less than limit", + ctx: context.TODO(), + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + jobFailed1, + jobSucceeded1, + jobPodSucceeded1, + jobPodFailed1, + }, + expectedStatus: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + }, + { + Result: velerov1api.BackupRepositoryMaintenanceFailed, + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Message: "fake-message-2", + }, + }, + }, + { + name: "equal to limit", + ctx: context.TODO(), + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + jobSucceeded2, + jobFailed1, + jobSucceeded1, + jobPodSucceeded1, + jobPodFailed1, + jobPodSucceeded2, + }, + expectedStatus: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + StartTimestamp: &metav1.Time{Time: now}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + }, + { + Result: velerov1api.BackupRepositoryMaintenanceFailed, + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Message: "fake-message-2", + }, + { + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + }, + }, + }, + { + name: "more than limit", + ctx: context.TODO(), + runtimeScheme: scheme, + kubeClientObj: []runtime.Object{ + jobSucceeded3, + jobSucceeded2, + jobFailed1, + jobSucceeded1, + jobPodSucceeded1, + jobPodFailed1, + jobPodSucceeded2, + jobPodSucceeded3, + }, + expectedStatus: []velerov1api.BackupRepositoryMaintenanceStatus{ + { + Result: velerov1api.BackupRepositoryMaintenanceFailed, + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + Message: "fake-message-2", + }, + { + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 2)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + }, + { + Result: velerov1api.BackupRepositoryMaintenanceSucceeded, + StartTimestamp: &metav1.Time{Time: now.Add(time.Hour * 3)}, + CompleteTimestamp: &metav1.Time{Time: now.Add(time.Hour * 4)}, + }, + }, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + fakeClientBuilder := fake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(test.runtimeScheme) + + fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build() + + history, err := WaitAllJobsComplete(test.ctx, fakeClient, repo, 3, velerotest.NewLogger()) + + if test.expectedError != "" { + assert.EqualError(t, err, test.expectedError) + } else { + require.NoError(t, err) + } + + assert.Len(t, history, len(test.expectedStatus)) + for i := 0; i < len(test.expectedStatus); i++ { + assert.Equal(t, test.expectedStatus[i].Result, history[i].Result) + assert.Equal(t, test.expectedStatus[i].Message, history[i].Message) + assert.Equal(t, test.expectedStatus[i].StartTimestamp.Time, history[i].StartTimestamp.Time) + assert.Equal(t, test.expectedStatus[i].CompleteTimestamp.Time, history[i].CompleteTimestamp.Time) + } + }) + } + + cancel() +} + +func TestBuildJob(t *testing.T) { + testCases := []struct { + name string + m *JobConfigs + deploy *appsv1.Deployment + logLevel logrus.Level + logFormat *logging.FormatFlag + expectedJobName string + expectedError bool + expectedEnv []v1.EnvVar + expectedEnvFrom []v1.EnvFromSource + }{ + { + name: "Valid maintenance job", + m: &JobConfigs{ + PodResources: &kube.PodResources{ + CPURequest: "100m", + MemoryRequest: "128Mi", + CPULimit: "200m", + MemoryLimit: "256Mi", + }, + }, + deploy: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "velero", + Namespace: "velero", + }, + Spec: appsv1.DeploymentSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "velero-repo-maintenance-container", + Image: "velero-image", + Env: []v1.EnvVar{ + { + Name: "test-name", + Value: "test-value", + }, + }, + EnvFrom: []v1.EnvFromSource{ + { + ConfigMapRef: &v1.ConfigMapEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "test-configmap", + }, + }, + }, + { + SecretRef: &v1.SecretEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "test-secret", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + logLevel: logrus.InfoLevel, + logFormat: logging.NewFormatFlag(), + expectedJobName: "test-123-maintain-job", + expectedError: false, + expectedEnv: []v1.EnvVar{ + { + Name: "test-name", + Value: "test-value", + }, + }, + expectedEnvFrom: []v1.EnvFromSource{ + { + ConfigMapRef: &v1.ConfigMapEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "test-configmap", + }, + }, + }, + { + SecretRef: &v1.SecretEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: "test-secret", + }, + }, + }, + }, + }, + { + name: "Error getting Velero server deployment", + m: &JobConfigs{ + PodResources: &kube.PodResources{ + CPURequest: "100m", + MemoryRequest: "128Mi", + CPULimit: "200m", + MemoryLimit: "256Mi", + }, + }, + logLevel: logrus.InfoLevel, + logFormat: logging.NewFormatFlag(), + expectedJobName: "", + expectedError: true, + }, + } + + param := provider.RepoParam{ + BackupRepo: &velerov1api.BackupRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "test-123", + }, + Spec: velerov1api.BackupRepositorySpec{ + VolumeNamespace: "test-123", + RepositoryType: "kopia", + }, + }, + BackupLocation: &velerov1api.BackupStorageLocation{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "test-location", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create a fake clientset with resources + objs := []runtime.Object{param.BackupLocation, param.BackupRepo} + + if tc.deploy != nil { + objs = append(objs, tc.deploy) + } + scheme := runtime.NewScheme() + _ = appsv1.AddToScheme(scheme) + _ = velerov1api.AddToScheme(scheme) + cli := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build() + + // Call the function to test + job, err := buildJob(cli, context.TODO(), param.BackupRepo, param.BackupLocation.Name, tc.m, *tc.m.PodResources, tc.logLevel, tc.logFormat) + + // Check the error + if tc.expectedError { + assert.Error(t, err) + assert.Nil(t, job) + } else { + assert.NoError(t, err) + assert.NotNil(t, job) + assert.Contains(t, job.Name, tc.expectedJobName) + assert.Equal(t, param.BackupRepo.Namespace, job.Namespace) + assert.Equal(t, param.BackupRepo.Name, job.Labels[RepositoryNameLabel]) + + assert.Equal(t, param.BackupRepo.Name, job.Spec.Template.ObjectMeta.Labels[RepositoryNameLabel]) + + // Check container + assert.Len(t, job.Spec.Template.Spec.Containers, 1) + container := job.Spec.Template.Spec.Containers[0] + assert.Equal(t, "velero-repo-maintenance-container", container.Name) + assert.Equal(t, "velero-image", container.Image) + assert.Equal(t, v1.PullIfNotPresent, container.ImagePullPolicy) + + // Check container env + assert.Equal(t, tc.expectedEnv, container.Env) + assert.Equal(t, tc.expectedEnvFrom, container.EnvFrom) + + // Check resources + expectedResources := v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPURequest), + v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryRequest), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPULimit), + v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryLimit), + }, + } + assert.Equal(t, expectedResources, container.Resources) + + // Check args + expectedArgs := []string{ + "repo-maintenance", + fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace), + fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType), + fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name), + fmt.Sprintf("--log-level=%s", tc.logLevel.String()), + fmt.Sprintf("--log-format=%s", tc.logFormat.String()), + } + assert.Equal(t, expectedArgs, container.Args) + + // Check affinity + assert.Nil(t, job.Spec.Template.Spec.Affinity) + + // Check tolerations + assert.Nil(t, job.Spec.Template.Spec.Tolerations) + + // Check node selector + assert.Nil(t, job.Spec.Template.Spec.NodeSelector) + } + }) + } +} diff --git a/pkg/repository/maintenance_test.go b/pkg/repository/maintenance_test.go deleted file mode 100644 index f6344b166a..0000000000 --- a/pkg/repository/maintenance_test.go +++ /dev/null @@ -1,460 +0,0 @@ -/* -Copyright the Velero contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package repository - -import ( - "context" - "fmt" - "strings" - "testing" - "time" - - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - velerotest "github.com/vmware-tanzu/velero/pkg/test" - "github.com/vmware-tanzu/velero/pkg/util/kube" -) - -func TestGenerateJobName1(t *testing.T) { - testCases := []struct { - repo string - expectedStart string - }{ - { - repo: "example", - expectedStart: "example-maintain-job-", - }, - { - repo: strings.Repeat("a", 60), - expectedStart: "repo-maintain-job-", - }, - } - - for _, tc := range testCases { - t.Run(tc.repo, func(t *testing.T) { - // Call the function to test - jobName := GenerateJobName(tc.repo) - - // Check if the generated job name starts with the expected prefix - if !strings.HasPrefix(jobName, tc.expectedStart) { - t.Errorf("generated job name does not start with expected prefix") - } - - // Check if the length of the generated job name exceeds the Kubernetes limit - if len(jobName) > 63 { - t.Errorf("generated job name exceeds Kubernetes limit") - } - }) - } -} -func TestDeleteOldMaintenanceJobs(t *testing.T) { - // Set up test repo and keep value - repo := "test-repo" - keep := 2 - - // Create some maintenance jobs for testing - var objs []client.Object - // Create a newer job - newerJob := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "job1", - Namespace: "default", - Labels: map[string]string{RepositoryNameLabel: repo}, - }, - Spec: batchv1.JobSpec{}, - } - objs = append(objs, newerJob) - // Create older jobs - for i := 2; i <= 3; i++ { - olderJob := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("job%d", i), - Namespace: "default", - Labels: map[string]string{RepositoryNameLabel: repo}, - CreationTimestamp: metav1.Time{ - Time: metav1.Now().Add(time.Duration(-24*i) * time.Hour), - }, - }, - Spec: batchv1.JobSpec{}, - } - objs = append(objs, olderJob) - } - // Create a fake Kubernetes client - scheme := runtime.NewScheme() - _ = batchv1.AddToScheme(scheme) - cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() - - // Call the function - err := DeleteOldMaintenanceJobs(cli, repo, keep) - assert.NoError(t, err) - - // Get the remaining jobs - jobList := &batchv1.JobList{} - err = cli.List(context.TODO(), jobList, client.MatchingLabels(map[string]string{RepositoryNameLabel: repo})) - assert.NoError(t, err) - - // We expect the number of jobs to be equal to 'keep' - assert.Len(t, jobList.Items, keep) - - // We expect that the oldest jobs were deleted - // Job3 should not be present in the remaining list - assert.NotContains(t, jobList.Items, objs[2]) - - // Job2 should also not be present in the remaining list - assert.NotContains(t, jobList.Items, objs[1]) -} - -func TestWaitForJobComplete(t *testing.T) { - // Set up test job - job := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-job", - Namespace: "default", - }, - Status: batchv1.JobStatus{}, - } - - // Define test cases - tests := []struct { - description string // Test case description - jobStatus batchv1.JobStatus // Job status to set for the test - expectError bool // Whether an error is expected - }{ - { - description: "Job Succeeded", - jobStatus: batchv1.JobStatus{ - Succeeded: 1, - }, - expectError: false, - }, - { - description: "Job Failed", - jobStatus: batchv1.JobStatus{ - Failed: 1, - }, - expectError: true, - }, - } - - // Run tests - for _, tc := range tests { - t.Run(tc.description, func(t *testing.T) { - // Set the job status - job.Status = tc.jobStatus - // Create a fake Kubernetes client - cli := fake.NewClientBuilder().WithObjects(job).Build() - // Call the function - err := WaitForJobComplete(context.Background(), cli, job) - - // Check if the error matches the expectation - if tc.expectError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) - } -} - -func TestGetMaintenanceResultFromJob(t *testing.T) { - // Set up test job - job := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-job", - Namespace: "default", - }, - } - - // Set up test pod with no status - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - Labels: map[string]string{"job-name": job.Name}, - }, - } - - // Create a fake Kubernetes client - cli := fake.NewClientBuilder().WithObjects(job, pod).Build() - - // test an error should be returned - result, err := GetMaintenanceResultFromJob(cli, job) - assert.Error(t, err) - assert.Equal(t, "", result) - - // Set a non-terminated container status to the pod - pod.Status = v1.PodStatus{ - ContainerStatuses: []v1.ContainerStatus{ - { - State: v1.ContainerState{}, - }, - }, - } - - // Test an error should be returned - cli = fake.NewClientBuilder().WithObjects(job, pod).Build() - result, err = GetMaintenanceResultFromJob(cli, job) - assert.Error(t, err) - assert.Equal(t, "", result) - - // Set a terminated container status to the pod - pod.Status = v1.PodStatus{ - ContainerStatuses: []v1.ContainerStatus{ - { - State: v1.ContainerState{ - Terminated: &v1.ContainerStateTerminated{ - Message: "test message", - }, - }, - }, - }, - } - - // This call should return the termination message with no error - cli = fake.NewClientBuilder().WithObjects(job, pod).Build() - result, err = GetMaintenanceResultFromJob(cli, job) - assert.NoError(t, err) - assert.Equal(t, "test message", result) -} - -func TestGetLatestMaintenanceJob(t *testing.T) { - // Set up test repo - repo := "test-repo" - - // Create some maintenance jobs for testing - var objs []client.Object - // Create a newer job - newerJob := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "job1", - Namespace: "default", - Labels: map[string]string{RepositoryNameLabel: repo}, - CreationTimestamp: metav1.Time{ - Time: metav1.Now().Add(time.Duration(-24) * time.Hour), - }, - }, - Spec: batchv1.JobSpec{}, - } - objs = append(objs, newerJob) - - // Create an older job - olderJob := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: "job2", - Namespace: "default", - Labels: map[string]string{RepositoryNameLabel: repo}, - }, - Spec: batchv1.JobSpec{}, - } - objs = append(objs, olderJob) - - // Create a fake Kubernetes client - scheme := runtime.NewScheme() - _ = batchv1.AddToScheme(scheme) - cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() - - // Call the function - job, err := GetLatestMaintenanceJob(cli, "default") - assert.NoError(t, err) - - // We expect the returned job to be the newer job - assert.Equal(t, newerJob.Name, job.Name) -} - -func TestGetMaintenanceJobConfig(t *testing.T) { - ctx := context.Background() - logger := logrus.New() - veleroNamespace := "velero" - repoMaintenanceJobConfig := "repo-maintenance-job-config" - repo := &velerov1api.BackupRepository{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: veleroNamespace, - Name: repoMaintenanceJobConfig, - }, - Spec: velerov1api.BackupRepositorySpec{ - BackupStorageLocation: "default", - RepositoryType: "kopia", - VolumeNamespace: "test", - }, - } - - testCases := []struct { - name string - repoJobConfig *v1.ConfigMap - expectedConfig *JobConfigs - expectedError error - }{ - { - name: "Config not exist", - expectedConfig: nil, - expectedError: nil, - }, - { - name: "Invalid JSON", - repoJobConfig: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: veleroNamespace, - Name: repoMaintenanceJobConfig, - }, - Data: map[string]string{ - "test-default-kopia": "{\"cpuRequest:\"100m\"}", - }, - }, - expectedConfig: nil, - expectedError: fmt.Errorf("fail to unmarshal configs from %s", repoMaintenanceJobConfig), - }, - { - name: "Find config specific for BackupRepository", - repoJobConfig: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: veleroNamespace, - Name: repoMaintenanceJobConfig, - }, - Data: map[string]string{ - "test-default-kopia": "{\"podResources\":{\"cpuRequest\":\"100m\",\"cpuLimit\":\"200m\",\"memoryRequest\":\"100Mi\",\"memoryLimit\":\"200Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"e2\"]}]}}]}", - }, - }, - expectedConfig: &JobConfigs{ - PodResources: &kube.PodResources{ - CPURequest: "100m", - CPULimit: "200m", - MemoryRequest: "100Mi", - MemoryLimit: "200Mi", - }, - LoadAffinities: []*kube.LoadAffinity{ - { - NodeSelector: metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "cloud.google.com/machine-family", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"e2"}, - }, - }, - }, - }, - }, - }, - expectedError: nil, - }, - { - name: "Find config specific for global", - repoJobConfig: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: veleroNamespace, - Name: repoMaintenanceJobConfig, - }, - Data: map[string]string{ - GlobalKeyForRepoMaintenanceJobCM: "{\"podResources\":{\"cpuRequest\":\"50m\",\"cpuLimit\":\"100m\",\"memoryRequest\":\"50Mi\",\"memoryLimit\":\"100Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"n2\"]}]}}]}", - }, - }, - expectedConfig: &JobConfigs{ - PodResources: &kube.PodResources{ - CPURequest: "50m", - CPULimit: "100m", - MemoryRequest: "50Mi", - MemoryLimit: "100Mi", - }, - LoadAffinities: []*kube.LoadAffinity{ - { - NodeSelector: metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "cloud.google.com/machine-family", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"n2"}, - }, - }, - }, - }, - }, - }, - expectedError: nil, - }, - { - name: "Specific config supersede global config", - repoJobConfig: &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: veleroNamespace, - Name: repoMaintenanceJobConfig, - }, - Data: map[string]string{ - GlobalKeyForRepoMaintenanceJobCM: "{\"podResources\":{\"cpuRequest\":\"50m\",\"cpuLimit\":\"100m\",\"memoryRequest\":\"50Mi\",\"memoryLimit\":\"100Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"n2\"]}]}}]}", - "test-default-kopia": "{\"podResources\":{\"cpuRequest\":\"100m\",\"cpuLimit\":\"200m\",\"memoryRequest\":\"100Mi\",\"memoryLimit\":\"200Mi\"},\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"e2\"]}]}}]}", - }, - }, - expectedConfig: &JobConfigs{ - PodResources: &kube.PodResources{ - CPURequest: "100m", - CPULimit: "200m", - MemoryRequest: "100Mi", - MemoryLimit: "200Mi", - }, - LoadAffinities: []*kube.LoadAffinity{ - { - NodeSelector: metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "cloud.google.com/machine-family", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"e2"}, - }, - }, - }, - }, - }, - }, - expectedError: nil, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - var fakeClient client.Client - if tc.repoJobConfig != nil { - fakeClient = velerotest.NewFakeControllerRuntimeClient(t, tc.repoJobConfig) - } else { - fakeClient = velerotest.NewFakeControllerRuntimeClient(t) - } - - jobConfig, err := GetMaintenanceJobConfig( - ctx, - fakeClient, - logger, - veleroNamespace, - repoMaintenanceJobConfig, - repo, - ) - - if tc.expectedError != nil { - require.ErrorContains(t, err, tc.expectedError.Error()) - } else { - require.NoError(t, err) - } - require.Equal(t, tc.expectedConfig, jobConfig) - }) - } -} diff --git a/pkg/repository/manager/manager.go b/pkg/repository/manager/manager.go index f590f2b145..7027c96500 100644 --- a/pkg/repository/manager/manager.go +++ b/pkg/repository/manager/manager.go @@ -23,11 +23,6 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" - appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/internal/credentials" @@ -35,9 +30,6 @@ import ( "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/repository/provider" "github.com/vmware-tanzu/velero/pkg/util/filesystem" - "github.com/vmware-tanzu/velero/pkg/util/kube" - "github.com/vmware-tanzu/velero/pkg/util/logging" - veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero" ) // Manager manages backup repositories. @@ -76,16 +68,10 @@ type manager struct { providers map[string]provider.Provider // client is the Velero controller manager's client. // It's limited to resources in the Velero namespace. - client client.Client - repoLocker *repository.RepoLocker - repoEnsurer *repository.Ensurer - fileSystem filesystem.Interface - repoMaintenanceJobConfig string - podResources kube.PodResources - keepLatestMaintenanceJobs int - log logrus.FieldLogger - logLevel logrus.Level - logFormat *logging.FormatFlag + client client.Client + repoLocker *repository.RepoLocker + fileSystem filesystem.Interface + log logrus.FieldLogger } // NewManager create a new repository manager. @@ -93,29 +79,17 @@ func NewManager( namespace string, client client.Client, repoLocker *repository.RepoLocker, - repoEnsurer *repository.Ensurer, credentialFileStore credentials.FileStore, credentialSecretStore credentials.SecretStore, - repoMaintenanceJobConfig string, - podResources kube.PodResources, - keepLatestMaintenanceJobs int, log logrus.FieldLogger, - logLevel logrus.Level, - logFormat *logging.FormatFlag, ) Manager { mgr := &manager{ - namespace: namespace, - client: client, - providers: map[string]provider.Provider{}, - repoLocker: repoLocker, - repoEnsurer: repoEnsurer, - fileSystem: filesystem.NewFileSystem(), - repoMaintenanceJobConfig: repoMaintenanceJobConfig, - podResources: podResources, - keepLatestMaintenanceJobs: keepLatestMaintenanceJobs, - log: log, - logLevel: logLevel, - logFormat: logFormat, + namespace: namespace, + client: client, + providers: map[string]provider.Provider{}, + repoLocker: repoLocker, + fileSystem: filesystem.NewFileSystem(), + log: log, } mgr.providers[velerov1api.BackupRepositoryTypeRestic] = provider.NewResticRepositoryProvider(credentialFileStore, mgr.fileSystem, mgr.log) @@ -176,91 +150,20 @@ func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) error { m.repoLocker.LockExclusive(repo.Name) defer m.repoLocker.UnlockExclusive(repo.Name) - param, err := m.assembleRepoParam(repo) + prd, err := m.getRepositoryProvider(repo) if err != nil { return errors.WithStack(err) } - - log := m.log.WithFields(logrus.Fields{ - "BSL name": param.BackupLocation.Name, - "repo type": param.BackupRepo.Spec.RepositoryType, - "repo name": param.BackupRepo.Name, - "repo UID": param.BackupRepo.UID, - }) - - job, err := repository.GetLatestMaintenanceJob(m.client, m.namespace) + param, err := m.assembleRepoParam(repo) if err != nil { return errors.WithStack(err) } - if job != nil && job.Status.Succeeded == 0 && job.Status.Failed == 0 { - log.Debugf("There already has a unfinished maintenance job %s/%s for repository %s, please wait for it to complete", job.Namespace, job.Name, param.BackupRepo.Name) - return nil - } - - jobConfig, err := repository.GetMaintenanceJobConfig( - context.Background(), - m.client, - m.log, - m.namespace, - m.repoMaintenanceJobConfig, - repo, - ) - if err != nil { - log.Infof("Fail to find the ConfigMap %s to build maintenance job with error: %s. Use default value.", - m.namespace+"/"+m.repoMaintenanceJobConfig, - err.Error(), - ) - } - - log.Info("Start to maintenance repo") - - maintenanceJob, err := m.buildMaintenanceJob( - jobConfig, - param, - ) - if err != nil { - return errors.Wrap(err, "error to build maintenance job") - } - - log = log.WithField("job", fmt.Sprintf("%s/%s", maintenanceJob.Namespace, maintenanceJob.Name)) - - if err := m.client.Create(context.TODO(), maintenanceJob); err != nil { - return errors.Wrap(err, "error to create maintenance job") - } - log.Debug("Creating maintenance job") - - defer func() { - if err := repository.DeleteOldMaintenanceJobs( - m.client, - param.BackupRepo.Name, - m.keepLatestMaintenanceJobs, - ); err != nil { - log.WithError(err).Error("Failed to delete maintenance job") - } - }() - - var jobErr error - if err := repository.WaitForJobComplete(context.TODO(), m.client, maintenanceJob); err != nil { - log.WithError(err).Error("Error to wait for maintenance job complete") - jobErr = err // we won't return here for job may failed by maintenance failure, we want return the actual error - } - - result, err := repository.GetMaintenanceResultFromJob(m.client, maintenanceJob) - if err != nil { - return errors.Wrap(err, "error to get maintenance job result") - } - - if result != "" { - return errors.New(fmt.Sprintf("Maintenance job %s failed: %s", maintenanceJob.Name, result)) - } - - if jobErr != nil { - return errors.Wrap(jobErr, "error to wait for maintenance job complete") + if err := prd.BoostRepoConnect(context.Background(), param); err != nil { + return errors.WithStack(err) } - log.Info("Maintenance repo complete") - return nil + return prd.PruneRepo(context.Background(), param) } func (m *manager) UnlockRepo(repo *velerov1api.BackupRepository) error { @@ -353,122 +256,3 @@ func (m *manager) assembleRepoParam(repo *velerov1api.BackupRepository) (provide BackupRepo: repo, }, nil } - -func (m *manager) buildMaintenanceJob( - config *repository.JobConfigs, - param provider.RepoParam, -) (*batchv1.Job, error) { - // Get the Velero server deployment - deployment := &appsv1.Deployment{} - err := m.client.Get(context.TODO(), types.NamespacedName{Name: "velero", Namespace: m.namespace}, deployment) - if err != nil { - return nil, err - } - - // Get the environment variables from the Velero server deployment - envVars := veleroutil.GetEnvVarsFromVeleroServer(deployment) - - // Get the referenced storage from the Velero server deployment - envFromSources := veleroutil.GetEnvFromSourcesFromVeleroServer(deployment) - - // Get the volume mounts from the Velero server deployment - volumeMounts := veleroutil.GetVolumeMountsFromVeleroServer(deployment) - - // Get the volumes from the Velero server deployment - volumes := veleroutil.GetVolumesFromVeleroServer(deployment) - - // Get the service account from the Velero server deployment - serviceAccount := veleroutil.GetServiceAccountFromVeleroServer(deployment) - - // Get image - image := veleroutil.GetVeleroServerImage(deployment) - - // Set resource limits and requests - cpuRequest := m.podResources.CPURequest - memRequest := m.podResources.MemoryRequest - cpuLimit := m.podResources.CPULimit - memLimit := m.podResources.MemoryLimit - if config != nil && config.PodResources != nil { - cpuRequest = config.PodResources.CPURequest - memRequest = config.PodResources.MemoryRequest - cpuLimit = config.PodResources.CPULimit - memLimit = config.PodResources.MemoryLimit - } - resources, err := kube.ParseResourceRequirements(cpuRequest, memRequest, cpuLimit, memLimit) - if err != nil { - return nil, errors.Wrap(err, "failed to parse resource requirements for maintenance job") - } - - // Set arguments - args := []string{"repo-maintenance"} - args = append(args, fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace)) - args = append(args, fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType)) - args = append(args, fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name)) - args = append(args, fmt.Sprintf("--log-level=%s", m.logLevel.String())) - args = append(args, fmt.Sprintf("--log-format=%s", m.logFormat.String())) - - // build the maintenance job - job := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: repository.GenerateJobName(param.BackupRepo.Name), - Namespace: param.BackupRepo.Namespace, - Labels: map[string]string{ - repository.RepositoryNameLabel: param.BackupRepo.Name, - }, - }, - Spec: batchv1.JobSpec{ - BackoffLimit: new(int32), // Never retry - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Name: "velero-repo-maintenance-pod", - Labels: map[string]string{ - repository.RepositoryNameLabel: param.BackupRepo.Name, - }, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "velero-repo-maintenance-container", - Image: image, - Command: []string{ - "/velero", - }, - Args: args, - ImagePullPolicy: v1.PullIfNotPresent, - Env: envVars, - EnvFrom: envFromSources, - VolumeMounts: volumeMounts, - Resources: resources, - }, - }, - RestartPolicy: v1.RestartPolicyNever, - Volumes: volumes, - ServiceAccountName: serviceAccount, - }, - }, - }, - } - - if config != nil && len(config.LoadAffinities) > 0 { - affinity := kube.ToSystemAffinity(config.LoadAffinities) - job.Spec.Template.Spec.Affinity = affinity - } - - if tolerations := veleroutil.GetTolerationsFromVeleroServer(deployment); tolerations != nil { - job.Spec.Template.Spec.Tolerations = tolerations - } - - if nodeSelector := veleroutil.GetNodeSelectorFromVeleroServer(deployment); nodeSelector != nil { - job.Spec.Template.Spec.NodeSelector = nodeSelector - } - - if labels := veleroutil.GetVeleroServerLables(deployment); len(labels) > 0 { - job.Spec.Template.Labels = labels - } - - if annotations := veleroutil.GetVeleroServerAnnotations(deployment); len(annotations) > 0 { - job.Spec.Template.Annotations = annotations - } - - return job, nil -} diff --git a/pkg/repository/manager/manager_test.go b/pkg/repository/manager/manager_test.go index e83f5f5276..d743e267ae 100644 --- a/pkg/repository/manager/manager_test.go +++ b/pkg/repository/manager/manager_test.go @@ -17,31 +17,18 @@ limitations under the License. package repository import ( - "fmt" "testing" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - appsv1 "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" kbclient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - "github.com/vmware-tanzu/velero/pkg/repository" - "github.com/vmware-tanzu/velero/pkg/repository/provider" - "github.com/vmware-tanzu/velero/pkg/util/kube" - "github.com/vmware-tanzu/velero/pkg/util/logging" ) func TestGetRepositoryProvider(t *testing.T) { var fakeClient kbclient.Client - mgr := NewManager("", fakeClient, nil, nil, nil, nil, "", kube.PodResources{}, 3, nil, logrus.InfoLevel, nil).(*manager) + mgr := NewManager("", fakeClient, nil, nil, nil, nil).(*manager) repo := &velerov1.BackupRepository{} // empty repository type @@ -60,220 +47,3 @@ func TestGetRepositoryProvider(t *testing.T) { _, err = mgr.getRepositoryProvider(repo) require.Error(t, err) } - -func TestBuildMaintenanceJob(t *testing.T) { - testCases := []struct { - name string - m *repository.JobConfigs - deploy *appsv1.Deployment - logLevel logrus.Level - logFormat *logging.FormatFlag - expectedJobName string - expectedError bool - expectedEnv []v1.EnvVar - expectedEnvFrom []v1.EnvFromSource - }{ - { - name: "Valid maintenance job", - m: &repository.JobConfigs{ - PodResources: &kube.PodResources{ - CPURequest: "100m", - MemoryRequest: "128Mi", - CPULimit: "200m", - MemoryLimit: "256Mi", - }, - }, - deploy: &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "velero", - Namespace: "velero", - }, - Spec: appsv1.DeploymentSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "velero-repo-maintenance-container", - Image: "velero-image", - Env: []v1.EnvVar{ - { - Name: "test-name", - Value: "test-value", - }, - }, - EnvFrom: []v1.EnvFromSource{ - { - ConfigMapRef: &v1.ConfigMapEnvSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: "test-configmap", - }, - }, - }, - { - SecretRef: &v1.SecretEnvSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: "test-secret", - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - logLevel: logrus.InfoLevel, - logFormat: logging.NewFormatFlag(), - expectedJobName: "test-123-maintain-job", - expectedError: false, - expectedEnv: []v1.EnvVar{ - { - Name: "test-name", - Value: "test-value", - }, - }, - expectedEnvFrom: []v1.EnvFromSource{ - { - ConfigMapRef: &v1.ConfigMapEnvSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: "test-configmap", - }, - }, - }, - { - SecretRef: &v1.SecretEnvSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: "test-secret", - }, - }, - }, - }, - }, - { - name: "Error getting Velero server deployment", - m: &repository.JobConfigs{ - PodResources: &kube.PodResources{ - CPURequest: "100m", - MemoryRequest: "128Mi", - CPULimit: "200m", - MemoryLimit: "256Mi", - }, - }, - logLevel: logrus.InfoLevel, - logFormat: logging.NewFormatFlag(), - expectedJobName: "", - expectedError: true, - }, - } - - param := provider.RepoParam{ - BackupRepo: &velerov1api.BackupRepository{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "velero", - Name: "test-123", - }, - Spec: velerov1api.BackupRepositorySpec{ - VolumeNamespace: "test-123", - RepositoryType: "kopia", - }, - }, - BackupLocation: &velerov1api.BackupStorageLocation{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "velero", - Name: "test-location", - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Create a fake clientset with resources - objs := []runtime.Object{param.BackupLocation, param.BackupRepo} - - if tc.deploy != nil { - objs = append(objs, tc.deploy) - } - scheme := runtime.NewScheme() - _ = appsv1.AddToScheme(scheme) - _ = velerov1api.AddToScheme(scheme) - cli := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build() - - mgr := NewManager( - "velero", - cli, - nil, - nil, - nil, - nil, - "", - kube.PodResources{}, - 3, - nil, - logrus.InfoLevel, - logging.NewFormatFlag(), - ).(*manager) - - // Call the function to test - job, err := mgr.buildMaintenanceJob(tc.m, param) - - // Check the error - if tc.expectedError { - assert.Error(t, err) - assert.Nil(t, job) - } else { - assert.NoError(t, err) - assert.NotNil(t, job) - assert.Contains(t, job.Name, tc.expectedJobName) - assert.Equal(t, param.BackupRepo.Namespace, job.Namespace) - assert.Equal(t, param.BackupRepo.Name, job.Labels[repository.RepositoryNameLabel]) - - assert.Equal(t, param.BackupRepo.Name, job.Spec.Template.ObjectMeta.Labels[repository.RepositoryNameLabel]) - - // Check container - assert.Len(t, job.Spec.Template.Spec.Containers, 1) - container := job.Spec.Template.Spec.Containers[0] - assert.Equal(t, "velero-repo-maintenance-container", container.Name) - assert.Equal(t, "velero-image", container.Image) - assert.Equal(t, v1.PullIfNotPresent, container.ImagePullPolicy) - - // Check container env - assert.Equal(t, tc.expectedEnv, container.Env) - assert.Equal(t, tc.expectedEnvFrom, container.EnvFrom) - - // Check resources - expectedResources := v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPURequest), - v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryRequest), - }, - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse(tc.m.PodResources.CPULimit), - v1.ResourceMemory: resource.MustParse(tc.m.PodResources.MemoryLimit), - }, - } - assert.Equal(t, expectedResources, container.Resources) - - // Check args - expectedArgs := []string{ - "repo-maintenance", - fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace), - fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType), - fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name), - fmt.Sprintf("--log-level=%s", tc.logLevel.String()), - fmt.Sprintf("--log-format=%s", tc.logFormat.String()), - } - assert.Equal(t, expectedArgs, container.Args) - - // Check affinity - assert.Nil(t, job.Spec.Template.Spec.Affinity) - - // Check tolerations - assert.Nil(t, job.Spec.Template.Spec.Tolerations) - - // Check node selector - assert.Nil(t, job.Spec.Template.Spec.NodeSelector) - } - }) - } -} diff --git a/pkg/test/test_logger.go b/pkg/test/test_logger.go index 65dc8422a7..6a3f3b9196 100644 --- a/pkg/test/test_logger.go +++ b/pkg/test/test_logger.go @@ -62,3 +62,19 @@ func NewSingleLoggerWithHooks(buffer *string, hooks []logrus.Hook) logrus.FieldL return logrus.NewEntry(logger) } + +type multipleLogRecorder struct { + buffer *[]string +} + +func (m *multipleLogRecorder) Write(p []byte) (n int, err error) { + *m.buffer = append(*m.buffer, string(p[:])) + return len(p), nil +} + +func NewMultipleLogger(buffer *[]string) logrus.FieldLogger { + logger := logrus.New() + logger.Out = &multipleLogRecorder{buffer} + logger.Level = logrus.TraceLevel + return logrus.NewEntry(logger) +}