Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rhkp committed Jan 16, 2025
1 parent 32a4588 commit 6042723
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ type DBMigratorJob struct {
JobsServiceDataSource *QuarkusDataSource
}

type DBMigratorJobStatus struct {
Name string
BatchJobStatus *batchv1.JobStatus
}

const (
dbMigrationJobName = "sonataflow-db-migrator-job"
dbMigrationContainerName = "db-migration-container"
Expand Down Expand Up @@ -228,9 +233,9 @@ func HandleDBMigrationJob(ctx context.Context, client client.Client, platform *o
if err != nil {
return nil, err
}
if dbMigratorJobStatus.Failed == 1 {
return nil, errors.New("DB migration job failed")
} else if dbMigratorJobStatus.Succeeded == 1 {
if dbMigratorJobStatus.BatchJobStatus.Failed == 1 {
return nil, errors.New("DB migration job " + dbMigratorJobStatus.Name + " failed in namespace: " + platform.Namespace)
} else if dbMigratorJobStatus.BatchJobStatus.Succeeded == 1 {
return platform, nil
} else {
// DB migration is still running
Expand Down Expand Up @@ -339,13 +344,13 @@ func createJobDBMigration(platform *operatorapi.SonataFlowPlatform, dbmj *DBMigr
}

// GetDBMigrationJobStatus Returns db migration job status
func (dbmj DBMigratorJob) GetDBMigrationJobStatus(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform) (*batchv1.JobStatus, error) {
func (dbmj DBMigratorJob) GetDBMigrationJobStatus(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform) (*DBMigratorJobStatus, error) {
job, err := client.BatchV1().Jobs(platform.Namespace).Get(ctx, dbMigrationJobName, metav1.GetOptions{})
if err != nil {
klog.V(log.E).InfoS("Error getting DB migrator job while monitoring completion: ", "error", err)
klog.V(log.E).InfoS("Error getting DB migrator job while monitoring completion: ", "error", err, "namespace", platform.Namespace, "job", job.Name)
return nil, err
}
return &job.Status, nil
return &DBMigratorJobStatus{job.Name, &job.Status}, nil
}

// NewSonataFlowPlatformDBMigrationPhase Returns a new DB migration phase for SonataFlowPlatform
Expand Down Expand Up @@ -388,24 +393,32 @@ func newDBMigrationJobCfg() *DBMigrationJobCfg {
}
}

func hasFailed(dbMigratorJobStatus *DBMigratorJobStatus) bool {
return dbMigratorJobStatus.BatchJobStatus.Failed == dbMigrationJobFailed
}

func hasSucceeded(dbMigratorJobStatus *DBMigratorJobStatus) bool {
return dbMigratorJobStatus.BatchJobStatus.Succeeded == dbMigrationJobSucceeded
}

// ReconcileDBMigrationJob Check the status of running DB migration job and return status
func (dbmj DBMigratorJob) ReconcileDBMigrationJob(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform) (*batchv1.JobStatus, error) {
func (dbmj DBMigratorJob) ReconcileDBMigrationJob(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform) (*DBMigratorJobStatus, error) {
platform.Status.SonataFlowPlatformDBMigrationPhase = NewSonataFlowPlatformDBMigrationPhase(operatorapi.DBMigrationStatusStarted, operatorapi.MessageDBMigrationStatusStarted, operatorapi.ReasonDBMigrationStatusStarted)

dbMigratorJobStatus, err := dbmj.GetDBMigrationJobStatus(ctx, client, platform)
if err != nil {
return nil, err
}

klog.V(log.I).InfoS("Db migration job status: ", "active", dbMigratorJobStatus.Active, "ready", dbMigratorJobStatus.Ready, "failed", dbMigratorJobStatus.Failed, "success", dbMigratorJobStatus.Succeeded, "CompletedIndexes", dbMigratorJobStatus.CompletedIndexes, "terminatedPods", dbMigratorJobStatus.UncountedTerminatedPods)
klog.V(log.I).InfoS("Db migration job status: ", "namespace", platform.Namespace, "job", dbMigratorJobStatus.Name, "active", dbMigratorJobStatus.BatchJobStatus.Active, "ready", dbMigratorJobStatus.BatchJobStatus.Ready, "failed", dbMigratorJobStatus.BatchJobStatus.Failed, "success", dbMigratorJobStatus.BatchJobStatus.Succeeded, "CompletedIndexes", dbMigratorJobStatus.BatchJobStatus.CompletedIndexes, "terminatedPods", dbMigratorJobStatus.BatchJobStatus.UncountedTerminatedPods)

if dbMigratorJobStatus.Failed == dbMigrationJobFailed {
if hasFailed(dbMigratorJobStatus) {
platform.Status.SonataFlowPlatformDBMigrationPhase = UpdateSonataFlowPlatformDBMigrationPhase(platform.Status.SonataFlowPlatformDBMigrationPhase, operatorapi.DBMigrationStatusFailed, operatorapi.MessageDBMigrationStatusFailed, operatorapi.ReasonDBMigrationStatusFailed)
klog.V(log.I).InfoS("DB migration job failed")
return dbMigratorJobStatus, errors.New("DB migration job failed")
} else if dbMigratorJobStatus.Succeeded == dbMigrationJobSucceeded {
klog.V(log.I).InfoS("DB migration job failed", "namespace", platform.Namespace, "job", dbMigratorJobStatus.Name)
return dbMigratorJobStatus, errors.New("DB migration job failed. namespace=" + platform.Namespace + " job=" + dbMigratorJobStatus.Name )
} else if hasSucceeded(dbMigratorJobStatus) {
platform.Status.SonataFlowPlatformDBMigrationPhase = UpdateSonataFlowPlatformDBMigrationPhase(platform.Status.SonataFlowPlatformDBMigrationPhase, operatorapi.DBMigrationStatusSucceeded, operatorapi.MessageDBMigrationStatusSucceeded, operatorapi.ReasonDBMigrationStatusSucceeded)
klog.V(log.I).InfoS("DB migration job succeeded")
klog.V(log.I).InfoS("DB migration job succeeded", "namespace", platform.Namespace, "job", dbMigratorJobStatus.Name)
} else {
// DB migration is still running
platform.Status.SonataFlowPlatformDBMigrationPhase = UpdateSonataFlowPlatformDBMigrationPhase(platform.Status.SonataFlowPlatformDBMigrationPhase, operatorapi.DBMigrationStatusInProgress, operatorapi.MessageDBMigrationStatusInProgress, operatorapi.ReasonDBMigrationStatusInProgress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S

if IsJobsBasedDBMigration(platform, psDI, psJS) {
p, err := HandleDBMigrationJob(ctx, action.client, platform, psDI, psJS)
if p == nil {
if p == nil && err == nil { // DB migration is in-progress
return nil, nil, nil
} else if p == nil && err != nil { // DB migration failed
klog.V(log.E).ErrorS(err, "Error handling DB migration job", "namespace", platform.Namespace)
return nil, nil, err
}
Expand Down

0 comments on commit 6042723

Please sign in to comment.