diff --git a/packages/sonataflow-operator/api/v1alpha08/sonataflow_persistence_types.go b/packages/sonataflow-operator/api/v1alpha08/sonataflow_persistence_types.go index 844809375bf..abac8df2282 100644 --- a/packages/sonataflow-operator/api/v1alpha08/sonataflow_persistence_types.go +++ b/packages/sonataflow-operator/api/v1alpha08/sonataflow_persistence_types.go @@ -17,6 +17,14 @@ package v1alpha08 +type DBMigrationStrategyType string + +const ( + DBMigrationStrategyService DBMigrationStrategyType = "service" + DBMigrationStrategyJob DBMigrationStrategyType = "job" + DBMigrationStrategyNone DBMigrationStrategyType = "none" +) + // PlatformPersistenceOptionsSpec configures the DataBase in the platform spec. This specification can // be used by workflows and platform services when they don't provide one of their own. // +optional @@ -54,10 +62,13 @@ type PersistenceOptionsSpec struct { // +optional PostgreSQL *PersistencePostgreSQL `json:"postgresql,omitempty"` - // Whether to migrate database on service startup? + // DB Migration approach for data-index and jobs-service. Use the following values as described. + // job: use job based approach provided by the SonataFlow operator. + // service: service itself shall migrate the db and will not use SonataFlow operator. + // none: no database migration functionality needed. // +optional - // +default: false - MigrateDBOnStartUp bool `json:"migrateDBOnStartUp"` + // +kubebuilder:default:=service + DBMigrationStrategy string `json:"dbMigrationStrategy,omitempty"` } // PersistencePostgreSQL configure postgresql connection for service(s). diff --git a/packages/sonataflow-operator/api/v1alpha08/sonataflowplatform_types.go b/packages/sonataflow-operator/api/v1alpha08/sonataflowplatform_types.go index 433566470fd..47f9786610e 100644 --- a/packages/sonataflow-operator/api/v1alpha08/sonataflowplatform_types.go +++ b/packages/sonataflow-operator/api/v1alpha08/sonataflowplatform_types.go @@ -104,6 +104,31 @@ const ( PlatformDuplicatedReason = "Duplicated" ) +type DBMigrationStatus string + +const ( + DBMigrationStatusStarted DBMigrationStatus = "Started" + DBMigrationStatusInProgress DBMigrationStatus = "In-Progress" + DBMigrationStatusSucceeded DBMigrationStatus = "Succeeded" + DBMigrationStatusFailed DBMigrationStatus = "Failed" + + MessageDBMigrationStatusStarted string = "Started the database migrations for the services" + MessageDBMigrationStatusInProgress string = "The database migrations for the services are in-progress" + MessageDBMigrationStatusSucceeded string = "The database migrations for the services are successful" + MessageDBMigrationStatusFailed string = "The database migrations for the services have failed" + + ReasonDBMigrationStatusStarted string = "Started by SonataFlow operator" + ReasonDBMigrationStatusInProgress string = "The database migration job is in-progress" + ReasonDBMigrationStatusSucceeded string = "The database migration job completed as expected" + ReasonDBMigrationStatusFailed string = "The database may be unreachable, invalid credentials supplied or flyway migration failed. Please check logs for further details." +) + +type SonataFlowPlatformDBMigrationPhase struct { + Status DBMigrationStatus `json:"dbMigrationStatus,omitempty"` + Message string `json:"message,omitempty"` + Reason string `json:"reason,omitempty"` +} + // SonataFlowPlatformStatus defines the observed state of SonataFlowPlatform // +k8s:openapi-gen=true type SonataFlowPlatformStatus struct { @@ -123,6 +148,8 @@ type SonataFlowPlatformStatus struct { // Triggers list of triggers created for the SonataFlowPlatform //+operator-sdk:csv:customresourcedefinitions:type=status,displayName="triggers" Triggers []SonataFlowPlatformTriggerRef `json:"triggers,omitempty"` + //+operator-sdk:csv:customresourcedefinitions:type=status,displayName="dbMigrationStatus" + SonataFlowPlatformDBMigrationPhase *SonataFlowPlatformDBMigrationPhase `json:"sonataFlowPlatformDBMigrationPhase,omitempty"` } // SonataFlowPlatformTriggerRef defines a trigger created for the SonataFlowPlatform. diff --git a/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go b/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go index e1ccae1731b..2468c5a57fd 100644 --- a/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go +++ b/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go @@ -1236,6 +1236,21 @@ func (in *SonataFlowPlatform) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SonataFlowPlatformDBMigrationPhase) DeepCopyInto(out *SonataFlowPlatformDBMigrationPhase) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowPlatformDBMigrationPhase. +func (in *SonataFlowPlatformDBMigrationPhase) DeepCopy() *SonataFlowPlatformDBMigrationPhase { + if in == nil { + return nil + } + out := new(SonataFlowPlatformDBMigrationPhase) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SonataFlowPlatformList) DeepCopyInto(out *SonataFlowPlatformList) { *out = *in @@ -1346,6 +1361,11 @@ func (in *SonataFlowPlatformStatus) DeepCopyInto(out *SonataFlowPlatformStatus) *out = make([]SonataFlowPlatformTriggerRef, len(*in)) copy(*out, *in) } + if in.SonataFlowPlatformDBMigrationPhase != nil { + in, out := &in.SonataFlowPlatformDBMigrationPhase, &out.SonataFlowPlatformDBMigrationPhase + *out = new(SonataFlowPlatformDBMigrationPhase) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowPlatformStatus. diff --git a/packages/sonataflow-operator/config/manager/controllers_cfg.yaml b/packages/sonataflow-operator/config/manager/controllers_cfg.yaml index 72780469e03..09874461c60 100644 --- a/packages/sonataflow-operator/config/manager/controllers_cfg.yaml +++ b/packages/sonataflow-operator/config/manager/controllers_cfg.yaml @@ -30,6 +30,8 @@ jobsServiceEphemeralImageTag: "docker.io/apache/incubator-kie-kogito-jobs-servic # The Data Index image to use, if empty the operator will use the default Apache Community one based on the current operator's version dataIndexPostgreSQLImageTag: "docker.io/apache/incubator-kie-kogito-data-index-postgresql:main" dataIndexEphemeralImageTag: "docker.io/apache/incubator-kie-kogito-data-index-ephemeral:main" +# The Kogito PostgreSQL DB Migrator image to use (TBD: to replace with apache image) +dbMigratorToolImageTag: "quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest" # SonataFlow base builder image used in the internal Dockerfile to build workflow applications in preview profile # Order of precedence is: # 1. SonataFlowPlatform in the given namespace diff --git a/packages/sonataflow-operator/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml b/packages/sonataflow-operator/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml index 43e9be99748..42a6fa178cd 100644 --- a/packages/sonataflow-operator/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml +++ b/packages/sonataflow-operator/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml @@ -181,6 +181,8 @@ spec: - description: Info generic information related to the Platform displayName: info path: info + - displayName: dbMigrationStatus + path: sonataFlowPlatformDBMigrationPhase - description: Triggers list of triggers created for the SonataFlowPlatform displayName: triggers path: triggers diff --git a/packages/sonataflow-operator/config/rbac/role.yaml b/packages/sonataflow-operator/config/rbac/role.yaml index 13184ed8ca5..4fdd4406c7f 100644 --- a/packages/sonataflow-operator/config/rbac/role.yaml +++ b/packages/sonataflow-operator/config/rbac/role.yaml @@ -21,6 +21,18 @@ kind: ClusterRole metadata: name: manager-role rules: + - apiGroups: + - batch + resources: + - jobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - monitoring.coreos.com resources: diff --git a/packages/sonataflow-operator/internal/controller/cfg/controllers_cfg.go b/packages/sonataflow-operator/internal/controller/cfg/controllers_cfg.go index 927d8d62e20..11c907dcf78 100644 --- a/packages/sonataflow-operator/internal/controller/cfg/controllers_cfg.go +++ b/packages/sonataflow-operator/internal/controller/cfg/controllers_cfg.go @@ -63,6 +63,7 @@ type ControllersCfg struct { JobsServiceEphemeralImageTag string `yaml:"jobsServiceEphemeralImageTag,omitempty"` DataIndexPostgreSQLImageTag string `yaml:"dataIndexPostgreSQLImageTag,omitempty"` DataIndexEphemeralImageTag string `yaml:"dataIndexEphemeralImageTag,omitempty"` + DbMigratorToolImageTag string `yaml:"dbMigratorToolImageTag,omitempty"` SonataFlowBaseBuilderImageTag string `yaml:"sonataFlowBaseBuilderImageTag,omitempty"` SonataFlowDevModeImageTag string `yaml:"sonataFlowDevModeImageTag,omitempty"` BuilderConfigMapName string `yaml:"builderConfigMapName,omitempty"` diff --git a/packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go b/packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go new file mode 100644 index 00000000000..ada369ced03 --- /dev/null +++ b/packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package platform + +import ( + "context" + "errors" + "fmt" + "strconv" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/version" + + operatorapi "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder/client" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/cfg" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/persistence" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log" +) + +type QuarkusDataSource struct { + JdbcUrl string + Username string + Password string + Schema string +} + +type DBMigratorJob struct { + MigrateDBDataIndex bool + DataIndexDataSource *QuarkusDataSource + MigrateDBJobsService bool + JobsServiceDataSource *QuarkusDataSource +} + +type DBMigratorJobStatus struct { + Name string + BatchJobStatus *batchv1.JobStatus +} + +const ( + dbMigrationJobName = "sonataflow-db-migrator-job" + dbMigrationContainerName = "db-migration-container" + dbMigrationJobFailed = 1 + dbMigrationJobSucceeded = 1 + + migrateDBDataIndex = "MIGRATE_DB_DATAINDEX" + quarkusDataSourceDataIndexJdbcURL = "QUARKUS_DATASOURCE_DATAINDEX_JDBC_URL" + quarkusDataSourceDataIndexUserName = "QUARKUS_DATASOURCE_DATAINDEX_USERNAME" + quarkusDataSourceDataIndexPassword = "QUARKUS_DATASOURCE_DATAINDEX_PASSWORD" + quarkusFlywayDataIndexSchemas = "QUARKUS_FLYWAY_DATAINDEX_SCHEMAS" + + migrateDBJobsService = "MIGRATE_DB_JOBSSERVICE" + quarkusDataSourceJobsServiceJdbcURL = "QUARKUS_DATASOURCE_JOBSSERVICE_JDBC_URL" + quarkusDataSourceJobsServiceUserName = "QUARKUS_DATASOURCE_JOBSSERVICE_USERNAME" + quarkusDataSourceJobsServicePassword = "QUARKUS_DATASOURCE_JOBSSERVICE_PASSWORD" + quarkusFlywayJobsServiceSchemas = "QUARKUS_FLYWAY_JOBSSERVICE_SCHEMAS" +) + +type DBMigrationJobCfg struct { + JobName string + ContainerName string + ToolImageName string +} + +func getJdbcUrl(env []corev1.EnvVar) string { + if env != nil { + for i := 0; i < len(env); i++ { + if env[i].Name == "QUARKUS_DATASOURCE_JDBC_URL" { + return env[i].Value + } + } + } + return "" +} + +// getQuarkusDSFromServicePersistence Returns QuarkusDataSource from service level persistence config +func getQuarkusDSFromServicePersistence(ctx context.Context, platform *operatorapi.SonataFlowPlatform, persistenceOptionsSpec *operatorapi.PersistenceOptionsSpec, defaultSchemaName string) *QuarkusDataSource { + klog.InfoS("Using service level persistence for PostgreSQL", "defaultSchemaName", defaultSchemaName) + quarkusDataSource := &QuarkusDataSource{} + env := persistence.ConfigurePostgreSQLEnv(persistenceOptionsSpec.PostgreSQL, defaultSchemaName, platform.Namespace) + quarkusDataSource.JdbcUrl = getJdbcUrl(env) + quarkusDataSource.Username, _ = services.GetSecretKeyValueString(ctx, persistenceOptionsSpec.PostgreSQL.SecretRef.Name, persistenceOptionsSpec.PostgreSQL.SecretRef.UserKey, platform.Namespace) + quarkusDataSource.Password, _ = services.GetSecretKeyValueString(ctx, persistenceOptionsSpec.PostgreSQL.SecretRef.Name, persistenceOptionsSpec.PostgreSQL.SecretRef.PasswordKey, platform.Namespace) + quarkusDataSource.Schema = persistence.GetDBSchemaName(persistenceOptionsSpec.PostgreSQL, defaultSchemaName) + return quarkusDataSource +} + +// getQuarkusDSFromPlatformPersistence Returns QuarkusDataSource from platform level persistence config +func getQuarkusDSFromPlatformPersistence(ctx context.Context, platform *operatorapi.SonataFlowPlatform, defaultSchemaName string) *QuarkusDataSource { + klog.InfoS("Using platform level persistence for PostgreSQL", "defaultSchemaName", defaultSchemaName) + quarkusDataSource := &QuarkusDataSource{} + postgresql := persistence.MapToPersistencePostgreSQL(platform, defaultSchemaName) + + env := persistence.ConfigurePostgreSQLEnv(postgresql, defaultSchemaName, platform.Namespace) + quarkusDataSource.JdbcUrl = getJdbcUrl(env) + quarkusDataSource.Username, _ = services.GetSecretKeyValueString(ctx, platform.Spec.Persistence.PostgreSQL.SecretRef.Name, platform.Spec.Persistence.PostgreSQL.SecretRef.UserKey, platform.Namespace) + quarkusDataSource.Password, _ = services.GetSecretKeyValueString(ctx, platform.Spec.Persistence.PostgreSQL.SecretRef.Name, platform.Spec.Persistence.PostgreSQL.SecretRef.PasswordKey, platform.Namespace) + quarkusDataSource.Schema = persistence.GetDBSchemaName(postgresql, defaultSchemaName) + return quarkusDataSource +} + +// getQuarkusDataSourceFromPersistence PostgreSQL persistence can be defined at platform level (where both DI and JS will use the same DB defined at platform level) or db can defined at Service level. Service level config will take precedence over platform level config. +func getQuarkusDataSourceFromPersistence(ctx context.Context, platform *operatorapi.SonataFlowPlatform, persistenceOptionsSpec *operatorapi.PersistenceOptionsSpec, defaultSchemaName string) *QuarkusDataSource { + + if persistenceOptionsSpec != nil && persistenceOptionsSpec.PostgreSQL != nil { + return getQuarkusDSFromServicePersistence(ctx, platform, persistenceOptionsSpec, defaultSchemaName) + } else if platform != nil && platform.Spec.Persistence != nil && platform.Spec.Persistence.PostgreSQL != nil { + return getQuarkusDSFromPlatformPersistence(ctx, platform, defaultSchemaName) + } + + return nil +} + +func NewDBMigratorJobData(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS services.PlatformServiceHandler) *DBMigratorJob { + + diJobsBasedDBMigration := false + jsJobsBasedDBMigration := false + + if pshDI.IsPersistenceEnabledtInSpec() { + diJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence) + } + if pshJS.IsPersistenceEnabledtInSpec() { + jsJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence) + } + + if (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) || (pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration) { + quarkusDataSourceDataIndex := &QuarkusDataSource{} + quarkusDataSourceJobService := &QuarkusDataSource{} + + if diJobsBasedDBMigration { + quarkusDataSourceDataIndex = getQuarkusDataSourceFromPersistence(ctx, platform, platform.Spec.Services.DataIndex.Persistence, "data-index-service") + } + + if jsJobsBasedDBMigration { + quarkusDataSourceJobService = getQuarkusDataSourceFromPersistence(ctx, platform, platform.Spec.Services.JobService.Persistence, "jobs-service") + } + + return &DBMigratorJob{ + MigrateDBDataIndex: diJobsBasedDBMigration, + DataIndexDataSource: quarkusDataSourceDataIndex, + MigrateDBJobsService: jsJobsBasedDBMigration, + JobsServiceDataSource: quarkusDataSourceJobService, + } + } + return nil +} + +// IsJobsBasedDBMigration returns whether job based db migration approach is needed? +func IsJobsBasedDBMigration(platform *operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS services.PlatformServiceHandler) bool { + diJobsBasedDBMigration := false + jsJobsBasedDBMigration := false + + if pshDI.IsPersistenceEnabledtInSpec() { + diJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence) + } + if pshJS.IsPersistenceEnabledtInSpec() { + jsJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence) + } + + return (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) || (pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration) +} + +func createOrUpdateDBMigrationJob(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS services.PlatformServiceHandler) (*DBMigratorJob, error) { + dbMigratorJob := NewDBMigratorJobData(ctx, client, platform, pshDI, pshJS) + + // Invoke DB Migration only if both or either DI/JS services are requested, in addition to DBMigrationStrategyJob + if dbMigratorJob != nil { + job := createJobDBMigration(platform, dbMigratorJob) + klog.V(log.I).InfoS("Starting DB Migration Job: ", "namespace", platform.Namespace, "job", job.Name) + if op, err := controllerutil.CreateOrUpdate(ctx, client, job, func() error { + return nil + }); err != nil { + return dbMigratorJob, err + } else { + klog.V(log.I).InfoS("DB Migration Job successfully created on cluster", "operation", op, "namespace", platform.Namespace, "job", job.Name) + } + } + return dbMigratorJob, nil +} + +// HandleDBMigrationJob Creates db migration job and executes it on the cluster +func HandleDBMigrationJob(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psDI services.PlatformServiceHandler, psJS services.PlatformServiceHandler) (*operatorapi.SonataFlowPlatform, error) { + + dbMigratorJob, err := createOrUpdateDBMigrationJob(ctx, client, platform, psDI, psJS) + if err != nil { + return nil, err + } + if dbMigratorJob != nil { + klog.V(log.E).InfoS("Created DB migration job") + dbMigratorJobStatus, err := dbMigratorJob.ReconcileDBMigrationJob(ctx, client, platform) + if err != nil { + return nil, err + } + if hasFailed(dbMigratorJobStatus) { + return nil, errors.New("DB migration job " + dbMigratorJobStatus.Name + " failed in namespace: " + platform.Namespace) + } else if hasSucceeded(dbMigratorJobStatus) { + return platform, nil + } else { + // DB migration is still running + return nil, nil + } + } + + return platform, nil +} + +func newQuarkusDataSource(jdbcURL string, userName string, password string, schema string) *QuarkusDataSource { + return &QuarkusDataSource{ + JdbcUrl: jdbcURL, + Username: userName, + Password: password, + Schema: schema, + } +} + +func createJobDBMigration(platform *operatorapi.SonataFlowPlatform, dbmj *DBMigratorJob) *batchv1.Job { + // In DB Migrator Tool, smallrye will throw error for empty string "" while initializing properties. + // So use an empty space as a default value. Please see more at: https://github.com/eclipse/microprofile-config/issues/671 + nonEmptyValue := " " + diQuarkusDataSource := newQuarkusDataSource(nonEmptyValue, nonEmptyValue, nonEmptyValue, nonEmptyValue) + jsQuarkusDataSource := newQuarkusDataSource(nonEmptyValue, nonEmptyValue, nonEmptyValue, nonEmptyValue) + + if dbmj.MigrateDBDataIndex && dbmj.DataIndexDataSource != nil { + diQuarkusDataSource.JdbcUrl = dbmj.DataIndexDataSource.JdbcUrl + diQuarkusDataSource.Username = dbmj.DataIndexDataSource.Username + diQuarkusDataSource.Password = dbmj.DataIndexDataSource.Password + diQuarkusDataSource.Schema = dbmj.DataIndexDataSource.Schema + } + + if dbmj.MigrateDBJobsService && dbmj.JobsServiceDataSource != nil { + jsQuarkusDataSource.JdbcUrl = dbmj.JobsServiceDataSource.JdbcUrl + jsQuarkusDataSource.Username = dbmj.JobsServiceDataSource.Username + jsQuarkusDataSource.Password = dbmj.JobsServiceDataSource.Password + jsQuarkusDataSource.Schema = dbmj.JobsServiceDataSource.Schema + } + + dbMigrationJobCfg := newDBMigrationJobCfg() + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: dbMigrationJobCfg.JobName, + Namespace: platform.Namespace, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: dbMigrationJobCfg.ContainerName, + Image: dbMigrationJobCfg.ToolImageName, + Env: []corev1.EnvVar{ + { + Name: migrateDBDataIndex, + Value: strconv.FormatBool(dbmj.MigrateDBDataIndex), + }, + { + Name: quarkusDataSourceDataIndexJdbcURL, + Value: diQuarkusDataSource.JdbcUrl, + }, + { + Name: quarkusDataSourceDataIndexUserName, + Value: diQuarkusDataSource.Username, + }, + { + Name: quarkusDataSourceDataIndexPassword, + Value: diQuarkusDataSource.Password, + }, + { + Name: quarkusFlywayDataIndexSchemas, + Value: diQuarkusDataSource.Schema, + }, + { + Name: migrateDBJobsService, + Value: strconv.FormatBool(dbmj.MigrateDBJobsService), + }, + { + Name: quarkusDataSourceJobsServiceJdbcURL, + Value: jsQuarkusDataSource.JdbcUrl, + }, + { + Name: quarkusDataSourceJobsServiceUserName, + Value: jsQuarkusDataSource.Username, + }, + { + Name: quarkusDataSourceJobsServicePassword, + Value: jsQuarkusDataSource.Password, + }, + { + Name: quarkusFlywayJobsServiceSchemas, + Value: jsQuarkusDataSource.Schema, + }, + }, + }, + }, + RestartPolicy: "Never", + }, + }, + BackoffLimit: pointer.Int32(0), + }, + } + return job +} + +// GetDBMigrationJobStatus Returns db migration job status +func (dbmj DBMigratorJob) GetDBMigrationJobStatus(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform) (*DBMigratorJobStatus, error) { + job, err := client.BatchV1().Jobs(platform.Namespace).Get(ctx, dbMigrationJobName, metav1.GetOptions{}) + if err != nil { + klog.V(log.E).InfoS("Error getting DB migrator job while monitoring completion: ", "error", err, "namespace", platform.Namespace, "job", job.Name) + return nil, err + } + return &DBMigratorJobStatus{job.Name, &job.Status}, nil +} + +// NewSonataFlowPlatformDBMigrationPhase Returns a new DB migration phase for SonataFlowPlatform +func NewSonataFlowPlatformDBMigrationPhase(status operatorapi.DBMigrationStatus, message string, reason string) *operatorapi.SonataFlowPlatformDBMigrationPhase { + return &operatorapi.SonataFlowPlatformDBMigrationPhase{ + Status: status, + Message: message, + Reason: reason, + } +} + +// UpdateSonataFlowPlatformDBMigrationPhase Updates a given SonataFlowPlatformDBMigrationPhase with the supplied values +func UpdateSonataFlowPlatformDBMigrationPhase(dbMigrationStatus *operatorapi.SonataFlowPlatformDBMigrationPhase, status operatorapi.DBMigrationStatus, message string, reason string) *operatorapi.SonataFlowPlatformDBMigrationPhase { + if dbMigrationStatus != nil { + dbMigrationStatus.Status = status + dbMigrationStatus.Message = message + dbMigrationStatus.Reason = reason + return dbMigrationStatus + } + return nil +} + +func getKogitoDBMigratorToolImageName() string { + + imgTag := cfg.GetCfg().DbMigratorToolImageTag + + if imgTag == "" { + // returns "docker.io/apache/incubator-kie-kogito-db-migrator-tool:" + imgTag = fmt.Sprintf("%s-%s:%s", constants.ImageNamePrefix, constants.KogitoDBMigratorTool, version.GetImageTagVersion()) + } + return imgTag +} + +func newDBMigrationJobCfg() *DBMigrationJobCfg { + return &DBMigrationJobCfg{ + JobName: dbMigrationJobName, + ContainerName: dbMigrationContainerName, + ToolImageName: getKogitoDBMigratorToolImageName(), + } +} + +func hasFailed(dbMigratorJobStatus *DBMigratorJobStatus) bool { + return dbMigratorJobStatus.BatchJobStatus.Failed == dbMigrationJobFailed +} + +func hasSucceeded(dbMigratorJobStatus *DBMigratorJobStatus) bool { + return dbMigratorJobStatus.BatchJobStatus.Succeeded == dbMigrationJobSucceeded +} + +// ReconcileDBMigrationJob Check the status of running DB migration job and return status +func (dbmj DBMigratorJob) ReconcileDBMigrationJob(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform) (*DBMigratorJobStatus, error) { + platform.Status.SonataFlowPlatformDBMigrationPhase = NewSonataFlowPlatformDBMigrationPhase(operatorapi.DBMigrationStatusStarted, operatorapi.MessageDBMigrationStatusStarted, operatorapi.ReasonDBMigrationStatusStarted) + + dbMigratorJobStatus, err := dbmj.GetDBMigrationJobStatus(ctx, client, platform) + if err != nil { + return nil, err + } + + klog.V(log.I).InfoS("Db migration job status: ", "namespace", platform.Namespace, "job", dbMigratorJobStatus.Name, "active", dbMigratorJobStatus.BatchJobStatus.Active, "ready", dbMigratorJobStatus.BatchJobStatus.Ready, "failed", dbMigratorJobStatus.BatchJobStatus.Failed, "success", dbMigratorJobStatus.BatchJobStatus.Succeeded, "CompletedIndexes", dbMigratorJobStatus.BatchJobStatus.CompletedIndexes, "terminatedPods", dbMigratorJobStatus.BatchJobStatus.UncountedTerminatedPods) + + if hasFailed(dbMigratorJobStatus) { + platform.Status.SonataFlowPlatformDBMigrationPhase = UpdateSonataFlowPlatformDBMigrationPhase(platform.Status.SonataFlowPlatformDBMigrationPhase, operatorapi.DBMigrationStatusFailed, operatorapi.MessageDBMigrationStatusFailed, operatorapi.ReasonDBMigrationStatusFailed) + klog.V(log.I).InfoS("DB migration job failed", "namespace", platform.Namespace, "job", dbMigratorJobStatus.Name) + return dbMigratorJobStatus, errors.New("DB migration job failed. namespace=" + platform.Namespace + " job=" + dbMigratorJobStatus.Name) + } else if hasSucceeded(dbMigratorJobStatus) { + platform.Status.SonataFlowPlatformDBMigrationPhase = UpdateSonataFlowPlatformDBMigrationPhase(platform.Status.SonataFlowPlatformDBMigrationPhase, operatorapi.DBMigrationStatusSucceeded, operatorapi.MessageDBMigrationStatusSucceeded, operatorapi.ReasonDBMigrationStatusSucceeded) + klog.V(log.I).InfoS("DB migration job succeeded", "namespace", platform.Namespace, "job", dbMigratorJobStatus.Name) + } else { + // DB migration is still running + platform.Status.SonataFlowPlatformDBMigrationPhase = UpdateSonataFlowPlatformDBMigrationPhase(platform.Status.SonataFlowPlatformDBMigrationPhase, operatorapi.DBMigrationStatusInProgress, operatorapi.MessageDBMigrationStatusInProgress, operatorapi.ReasonDBMigrationStatusInProgress) + } + + return dbMigratorJobStatus, nil +} diff --git a/packages/sonataflow-operator/internal/controller/platform/db_migrator_job_test.go b/packages/sonataflow-operator/internal/controller/platform/db_migrator_job_test.go new file mode 100644 index 00000000000..4c8357ec02e --- /dev/null +++ b/packages/sonataflow-operator/internal/controller/platform/db_migrator_job_test.go @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package platform + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/persistence" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/test" +) + +const ( + DataIndexJdbcUrl = "jdbc:postgresql://postgres:5432/sonataflow?currentSchema=data-index-service" + DefaultSchema = "default-schema" + DataIndexSchemaName = "data-index-service" + SonataFlowPlatformName = "db-migration-sonataflow-platform" + DBMigrationStrategyJob = "job" + UserName = "postgres" + Password = "postgres" + DbMigrationJobName = "sonataflow-db-migrator-job" + DbMigrationContainerName = "db-migration-container" +) + +func getBaseSonataFlowPlatformInReadyPhase(namespace string) *v1alpha08.SonataFlowPlatform { + return test.GetBaseSonataFlowPlatformInReadyPhase(namespace) +} + +func TestDbMigratorJob(t *testing.T) { + t.Run("verify the sonataflow-platform first", func(t *testing.T) { + ksp := getBaseSonataFlowPlatformInReadyPhase(t.Name()) + assert.Equal(t, ksp.Name, SonataFlowPlatformName) + assert.Equal(t, ksp.Spec.Services.DataIndex.Persistence.DBMigrationStrategy, DBMigrationStrategyJob) + }) + + t.Run("verify data-index jdbc url", func(t *testing.T) { + ksp := getBaseSonataFlowPlatformInReadyPhase(t.Name()) + env := persistence.ConfigurePostgreSQLEnv(ksp.Spec.Services.DataIndex.Persistence.PostgreSQL, "data-index-schema", ksp.Namespace) + + jdbcUrl := getJdbcUrl(env) + assert.Equal(t, jdbcUrl, DataIndexJdbcUrl) + }) + + t.Run("verify new quarkus data source", func(t *testing.T) { + quarkusDataSource := newQuarkusDataSource(DataIndexJdbcUrl, UserName, Password, DataIndexSchemaName) + assert.Equal(t, quarkusDataSource.Schema, DataIndexSchemaName) + assert.Equal(t, quarkusDataSource.JdbcUrl, DataIndexJdbcUrl) + }) + + t.Run("verify new new db migration job config", func(t *testing.T) { + dbMigrationJobCfg := newDBMigrationJobCfg() + assert.Equal(t, dbMigrationJobCfg.JobName, DbMigrationJobName) + assert.Equal(t, dbMigrationJobCfg.ContainerName, DbMigrationContainerName) + }) +} diff --git a/packages/sonataflow-operator/internal/controller/platform/k8s.go b/packages/sonataflow-operator/internal/controller/platform/k8s.go index db90d01a825..07c2e781146 100644 --- a/packages/sonataflow-operator/internal/controller/platform/k8s.go +++ b/packages/sonataflow-operator/internal/controller/platform/k8s.go @@ -69,13 +69,24 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S } psDI := services.NewDataIndexHandler(platform) + psJS := services.NewJobServiceHandler(platform) + + if IsJobsBasedDBMigration(platform, psDI, psJS) { + p, err := HandleDBMigrationJob(ctx, action.client, platform, psDI, psJS) + if p == nil && err == nil { // DB migration is in-progress + return nil, nil, nil + } else if p == nil && err != nil { // DB migration failed + klog.V(log.E).ErrorS(err, "Error handling DB migration job", "namespace", platform.Namespace) + return nil, nil, err + } + } + if psDI.IsServiceSetInSpec() { if event, err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil { return nil, event, err } } - psJS := services.NewJobServiceHandler(platform) if psJS.IsServiceSetInSpec() { if event, err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil { return nil, event, err diff --git a/packages/sonataflow-operator/internal/controller/platform/services/secrets.go b/packages/sonataflow-operator/internal/controller/platform/services/secrets.go new file mode 100644 index 00000000000..de7563810bd --- /dev/null +++ b/packages/sonataflow-operator/internal/controller/platform/services/secrets.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package services + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils" +) + +func GetSecretKeyValueString(ctx context.Context, secretName string, secretKey string, nameSpace string) (string, error) { + secret := corev1.Secret{} + err := utils.GetClient().Get(ctx, ctrl.ObjectKey{Namespace: nameSpace, Name: secretName}, &secret) + + if err != nil { + klog.V(log.E).InfoS("Error extracting secret: ", "namespace", nameSpace, "error", err) + return "", err + } + + return string(secret.Data[secretKey]), nil +} diff --git a/packages/sonataflow-operator/internal/controller/platform/services/services.go b/packages/sonataflow-operator/internal/controller/platform/services/services.go index 03427c548f6..6ef5ac4204a 100644 --- a/packages/sonataflow-operator/internal/controller/platform/services/services.go +++ b/packages/sonataflow-operator/internal/controller/platform/services/services.go @@ -21,6 +21,7 @@ package services import ( "fmt" + "strconv" "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/version" @@ -45,6 +46,7 @@ import ( "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles" "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils/kubernetes" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08" operatorapi "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08" "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants" "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/persistence" @@ -96,6 +98,8 @@ type PlatformServiceHandler interface { IsServiceSetInSpec() bool // IsServiceEnabledInSpec returns true if the service is enabled in the spec. IsServiceEnabledInSpec() bool + // IsPersistenceEnabledtInSpec returns true if the service has persistence set in the spec. + IsPersistenceEnabledtInSpec() bool // GetLocalServiceBaseUrl returns the base url of the local service GetLocalServiceBaseUrl() string // GetServiceBaseUrl returns the base url of the service, based on whether using local or cluster-scoped service. @@ -112,12 +116,20 @@ type PlatformServiceHandler interface { // Check if K_SINK has injected for Job Service. No Op for Data Index CheckKSinkInjected() (bool, error) + + // Returns whether job based, service based or no DB migration is needed + GetDBMigrationStrategy() operatorapi.DBMigrationStrategyType } type DataIndexHandler struct { platform *operatorapi.SonataFlowPlatform } +// GetDBMigrationStrategy returns DB migration approach +func (d *DataIndexHandler) GetDBMigrationStrategy() operatorapi.DBMigrationStrategyType { + return GetDBMigrationStrategy(d.platform.Spec.Services.DataIndex.Persistence) +} + func NewDataIndexHandler(platform *operatorapi.SonataFlowPlatform) PlatformServiceHandler { return &DataIndexHandler{platform: platform} } @@ -174,6 +186,10 @@ func (d *DataIndexHandler) IsServiceEnabledInSpec() bool { return isDataIndexEnabled(d.platform) } +func (d DataIndexHandler) IsPersistenceEnabledtInSpec() bool { + return d.IsServiceSetInSpec() && d.platform.Spec.Services.DataIndex.Persistence != nil +} + func (d *DataIndexHandler) isServiceEnabledInStatus() bool { return d.platform != nil && d.platform.Status.ClusterPlatformRef != nil && d.platform.Status.ClusterPlatformRef.Services != nil && d.platform.Status.ClusterPlatformRef.Services.DataIndexRef != nil && @@ -233,18 +249,51 @@ func (d *DataIndexHandler) hasPostgreSQLConfigured() bool { (d.platform.Spec.Persistence != nil && d.platform.Spec.Persistence.PostgreSQL != nil)) } +func GetDBMigrationStrategy(persistence *operatorapi.PersistenceOptionsSpec) operatorapi.DBMigrationStrategyType { + dbMigrationStrategy := operatorapi.DBMigrationStrategyNone + + if persistence != nil { + return operatorapi.DBMigrationStrategyType(persistence.DBMigrationStrategy) + } + + return dbMigrationStrategy +} + +func IsServiceBasedDBMigration(persistence *operatorapi.PersistenceOptionsSpec) bool { + dbMigrationStrategy := GetDBMigrationStrategy(persistence) + return dbMigrationStrategy == operatorapi.DBMigrationStrategyService +} + +func IsJobsBasedDBMigration(persistence *operatorapi.PersistenceOptionsSpec) bool { + dbMigrationStrategy := GetDBMigrationStrategy(persistence) + return dbMigrationStrategy == operatorapi.DBMigrationStrategyJob +} + +func IsNoDBMigration(persistence *operatorapi.PersistenceOptionsSpec) bool { + dbMigrationStrategy := GetDBMigrationStrategy(persistence) + return dbMigrationStrategy == operatorapi.DBMigrationStrategyNone || dbMigrationStrategy == "" +} + +func isDBMigrationStrategyService(persistence *v1alpha08.PersistenceOptionsSpec) string { + dbMigrationStrategyService := "true" + if persistence != nil { + dbMigrationStrategyService = strconv.FormatBool(IsServiceBasedDBMigration(persistence)) + } + + return dbMigrationStrategyService +} + func (d *DataIndexHandler) ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container { if d.hasPostgreSQLConfigured() { p := persistence.RetrievePostgreSQLConfiguration(d.platform.Spec.Services.DataIndex.Persistence, d.platform.Spec.Persistence, d.GetServiceName()) c := containerSpec.DeepCopy() c.Image = d.GetServiceImageName(constants.PersistenceTypePostgreSQL) c.Env = append(c.Env, persistence.ConfigurePostgreSQLEnv(p.PostgreSQL, d.GetServiceName(), d.platform.Namespace)...) - // TODO upcoming work as part of the DB Migrator incorporation should continue where - // assignments like -> migrateDBOnStart := strconv.FormatBool(d.platform.Spec.Services.DataIndex.Persistence.MigrateDBOnStartUp) introduces nil pointer references, - // since Services, and services Persistence are optional references. + + dbMigrationStrategyService := isDBMigrationStrategyService(d.platform.Spec.Services.DataIndex.Persistence) // specific to DataIndex - c.Env = append(c.Env, corev1.EnvVar{Name: quarkusHibernateORMDatabaseGeneration, Value: "update"}, corev1.EnvVar{Name: quarkusFlywayMigrateAtStart, Value: "true"}) + c.Env = append(c.Env, corev1.EnvVar{Name: quarkusHibernateORMDatabaseGeneration, Value: "update"}, corev1.EnvVar{Name: quarkusFlywayMigrateAtStart, Value: dbMigrationStrategyService}) return c } return containerSpec @@ -291,6 +340,11 @@ type JobServiceHandler struct { platform *operatorapi.SonataFlowPlatform } +// GetDBMigrationStrategy returns db migration approach otherwise +func (j *JobServiceHandler) GetDBMigrationStrategy() operatorapi.DBMigrationStrategyType { + return GetDBMigrationStrategy(j.platform.Spec.Services.JobService.Persistence) +} + func NewJobServiceHandler(platform *operatorapi.SonataFlowPlatform) PlatformServiceHandler { return &JobServiceHandler{platform: platform} } @@ -351,6 +405,10 @@ func (j *JobServiceHandler) IsServiceEnabledInSpec() bool { return isJobServiceEnabled(j.platform) } +func (j JobServiceHandler) IsPersistenceEnabledtInSpec() bool { + return j.IsServiceSetInSpec() && j.platform.Spec.Services.JobService.Persistence != nil +} + func (j *JobServiceHandler) isServiceEnabledInStatus() bool { return j.platform != nil && j.platform.Status.ClusterPlatformRef != nil && j.platform.Status.ClusterPlatformRef.Services != nil && j.platform.Status.ClusterPlatformRef.Services.JobServiceRef != nil && @@ -424,12 +482,11 @@ func (j *JobServiceHandler) ConfigurePersistence(containerSpec *corev1.Container c.Image = j.GetServiceImageName(constants.PersistenceTypePostgreSQL) p := persistence.RetrievePostgreSQLConfiguration(j.platform.Spec.Services.JobService.Persistence, j.platform.Spec.Persistence, j.GetServiceName()) c.Env = append(c.Env, persistence.ConfigurePostgreSQLEnv(p.PostgreSQL, j.GetServiceName(), j.platform.Namespace)...) - // TODO upcoming work as part of the DB Migrator incorporation should continue where - // assignments like -> migrateDBOnStart := strconv.FormatBool(j.platform.Spec.Services.JobService.Persistence.MigrateDBOnStartUp) introduces nil pointer references, - // since Services, and services Persistence are optional references. + + dbMigrationStrategyService := isDBMigrationStrategyService(j.platform.Spec.Services.JobService.Persistence) // Specific to Job Service - c.Env = append(c.Env, corev1.EnvVar{Name: "QUARKUS_FLYWAY_MIGRATE_AT_START", Value: "true"}) + c.Env = append(c.Env, corev1.EnvVar{Name: "QUARKUS_FLYWAY_MIGRATE_AT_START", Value: dbMigrationStrategyService}) c.Env = append(c.Env, corev1.EnvVar{Name: "KOGITO_JOBS_SERVICE_LOADJOBERRORSTRATEGY", Value: "FAIL_SERVICE"}) return c } diff --git a/packages/sonataflow-operator/internal/controller/profiles/common/constants/platform_services.go b/packages/sonataflow-operator/internal/controller/profiles/common/constants/platform_services.go index 4aef6076eeb..38b6d4d299b 100644 --- a/packages/sonataflow-operator/internal/controller/profiles/common/constants/platform_services.go +++ b/packages/sonataflow-operator/internal/controller/profiles/common/constants/platform_services.go @@ -76,9 +76,11 @@ const ( JobServiceName = "jobs-service" ImageNamePrefix = "docker.io/apache/incubator-kie-kogito" DataIndexName = "data-index" + KogitoDBMigratorTool = "db-migrator-tool" - DefaultDatabaseName string = "sonataflow" - DefaultPostgreSQLPort int = 5432 + DefaultPostgresServiceName string = "postgresql" + DefaultDatabaseName string = "sonataflow" + DefaultPostgreSQLPort int = 5432 ) type PersistenceType string diff --git a/packages/sonataflow-operator/internal/controller/profiles/common/persistence/postgresql.go b/packages/sonataflow-operator/internal/controller/profiles/common/persistence/postgresql.go index e0aa6456230..a90a774c9b9 100644 --- a/packages/sonataflow-operator/internal/controller/profiles/common/persistence/postgresql.go +++ b/packages/sonataflow-operator/internal/controller/profiles/common/persistence/postgresql.go @@ -19,6 +19,7 @@ package persistence import ( "fmt" + "strings" "github.com/magiconair/properties" @@ -170,3 +171,58 @@ func GetPostgreSQLWorkflowProperties(workflow *operatorapi.SonataFlow) *properti } return props } + +// GetDBSchemaName Parses jdbc url and returns the schema name +func GetDBSchemaName(persistencePostgreSQL *operatorapi.PersistencePostgreSQL, defaultSchemaName string) string { + if persistencePostgreSQL != nil && persistencePostgreSQL.ServiceRef != nil && len(persistencePostgreSQL.ServiceRef.DatabaseSchema) > 0 { + return persistencePostgreSQL.ServiceRef.DatabaseSchema + } + + if persistencePostgreSQL != nil && len(persistencePostgreSQL.JdbcUrl) > 0 { + jdbcURL := persistencePostgreSQL.JdbcUrl + _, a, found := strings.Cut(jdbcURL, "currentSchema=") + + if found { + if strings.Contains(a, "&") { + b, _, found := strings.Cut(a, "&") + if found { + return b + } + } else { + return a + } + } + } + return defaultSchemaName +} + +func MapToPersistencePostgreSQL(platform *operatorapi.SonataFlowPlatform, defaultSchemaName string) *operatorapi.PersistencePostgreSQL { + if platform.Spec.Persistence != nil && platform.Spec.Persistence.PostgreSQL != nil { + persistencePostgreSQL := &operatorapi.PersistencePostgreSQL{} + persistencePostgreSQL.SecretRef = platform.Spec.Persistence.PostgreSQL.SecretRef + + if len(platform.Spec.Persistence.PostgreSQL.JdbcUrl) > 0 { + persistencePostgreSQL.JdbcUrl = platform.Spec.Persistence.PostgreSQL.JdbcUrl + } + + serviceRef := &operatorapi.PostgreSQLServiceOptions{} + if platform.Spec.Persistence.PostgreSQL.ServiceRef != nil { + + serviceRef.DatabaseSchema = defaultSchemaName + serviceRef.SQLServiceOptions = &operatorapi.SQLServiceOptions{} + + if len(platform.Spec.Persistence.PostgreSQL.ServiceRef.Name) > 0 { + serviceRef.SQLServiceOptions.Name = platform.Spec.Persistence.PostgreSQL.ServiceRef.Name + } + + if len(platform.Spec.Persistence.PostgreSQL.ServiceRef.DatabaseName) > 0 { + serviceRef.SQLServiceOptions.DatabaseName = platform.Spec.Persistence.PostgreSQL.ServiceRef.DatabaseName + } + + persistencePostgreSQL.ServiceRef = serviceRef + } + return persistencePostgreSQL + } + + return nil +} diff --git a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller.go b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller.go index 1a87f072cd9..36a6146fc9a 100644 --- a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller.go +++ b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller.go @@ -71,6 +71,7 @@ type SonataFlowPlatformReconciler struct { //+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowplatforms,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowplatforms/status,verbs=get;update;patch //+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowplatforms/finalizers,verbs=update +//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -181,6 +182,10 @@ func (r *SonataFlowPlatformReconciler) Reconcile(ctx context.Context, req reconc if err != nil { return reconcile.Result{}, err } + } else { + return reconcile.Result{ + RequeueAfter: 5 * time.Second, + }, nil } // handle one action at time so the resource diff --git a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go index b1de0db704a..407b84beb7d 100644 --- a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go +++ b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go @@ -277,14 +277,14 @@ func TestSonataFlowPlatformController(t *testing.T) { DataIndex: &v1alpha08.DataIndexServiceSpec{ ServiceSpec: v1alpha08.ServiceSpec{ Persistence: &v1alpha08.PersistenceOptionsSpec{ - MigrateDBOnStartUp: false, + DBMigrationStrategy: "none", }, }, }, JobService: &v1alpha08.JobServiceServiceSpec{ ServiceSpec: v1alpha08.ServiceSpec{ Persistence: &v1alpha08.PersistenceOptionsSpec{ - MigrateDBOnStartUp: false, + DBMigrationStrategy: "none", }, }, }, diff --git a/packages/sonataflow-operator/operator.yaml b/packages/sonataflow-operator/operator.yaml index 050b16eec7a..359e526df3c 100644 --- a/packages/sonataflow-operator/operator.yaml +++ b/packages/sonataflow-operator/operator.yaml @@ -1219,9 +1219,14 @@ spec: by default. maxProperties: 2 properties: - migrateDBOnStartUp: - description: Whether to migrate database on service startup? - type: boolean + dbMigrationStrategy: + default: service + description: |- + DB Migration approach for data-index and jobs-service. Use the following values as described. + job: use job based approach provided by the SonataFlow operator. + service: service itself shall migrate the db and will not use SonataFlow operator. + none: no database migration functionality needed. + type: string postgresql: description: Connect configured services to a postgresql database. @@ -9247,9 +9252,14 @@ spec: by default. maxProperties: 2 properties: - migrateDBOnStartUp: - description: Whether to migrate database on service startup? - type: boolean + dbMigrationStrategy: + default: service + description: |- + DB Migration approach for data-index and jobs-service. Use the following values as described. + job: use job based approach provided by the SonataFlow operator. + service: service itself shall migrate the db and will not use SonataFlow operator. + none: no database migration functionality needed. + type: string postgresql: description: Connect configured services to a postgresql database. @@ -17416,6 +17426,15 @@ spec: description: The generation observed by the deployment controller. format: int64 type: integer + sonataFlowPlatformDBMigrationPhase: + properties: + dbMigrationStatus: + type: string + message: + type: string + reason: + type: string + type: object triggers: description: Triggers list of triggers created for the SonataFlowPlatform items: @@ -19403,9 +19422,14 @@ spec: for the workflow maxProperties: 2 properties: - migrateDBOnStartUp: - description: Whether to migrate database on service startup? - type: boolean + dbMigrationStrategy: + default: service + description: |- + DB Migration approach for data-index and jobs-service. Use the following values as described. + job: use job based approach provided by the SonataFlow operator. + service: service itself shall migrate the db and will not use SonataFlow operator. + none: no database migration functionality needed. + type: string postgresql: description: Connect configured services to a postgresql database. maxProperties: 2 @@ -27738,6 +27762,18 @@ kind: ClusterRole metadata: name: sonataflow-operator-manager-role rules: + - apiGroups: + - batch + resources: + - jobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - monitoring.coreos.com resources: @@ -28148,6 +28184,8 @@ data: # The Data Index image to use, if empty the operator will use the default Apache Community one based on the current operator's version dataIndexPostgreSQLImageTag: "docker.io/apache/incubator-kie-kogito-data-index-postgresql:main" dataIndexEphemeralImageTag: "docker.io/apache/incubator-kie-kogito-data-index-ephemeral:main" + # The Kogito PostgreSQL DB Migrator image to use (TBD: to replace with apache image) + dbMigratorToolImageTag: "quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest" # SonataFlow base builder image used in the internal Dockerfile to build workflow applications in preview profile # Order of precedence is: # 1. SonataFlowPlatform in the given namespace diff --git a/packages/sonataflow-operator/test/e2e/platform_test.go b/packages/sonataflow-operator/test/e2e/platform_test.go index c7074176d85..671a0705623 100644 --- a/packages/sonataflow-operator/test/e2e/platform_test.go +++ b/packages/sonataflow-operator/test/e2e/platform_test.go @@ -77,6 +77,46 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() { } } }) + + var _ = Describe("Db migration :: ", Ordered, func() { + + Describe("ensure service based db migration", func() { + projectDir, _ := utils.GetProjectDir() + It("should successfully deploy the SonataFlowPlatform with data index and jobs service", func() { + By("Deploy the CR") + var manifests []byte + EventuallyWithOffset(1, func() error { + var err error + cmd := exec.Command("kubectl", "kustomize", filepath.Join(projectDir, + "test/e2e/testdata/platform/persistence/service_based_db_migration")) + manifests, err = utils.Run(cmd) + return err + }, time.Minute, time.Second).Should(Succeed()) + cmd := exec.Command("kubectl", "create", "-n", targetNamespace, "-f", "-") + cmd.Stdin = bytes.NewBuffer(manifests) + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("Wait for SonatatFlowPlatform CR to complete deployment") + // wait for service deployments to be ready + EventuallyWithOffset(1, func() error { + cmd = exec.Command("kubectl", "wait", "pod", "-n", targetNamespace, "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "--for", "condition=Ready", "--timeout=5s") + _, err = utils.Run(cmd) + return err + }, 10*time.Minute, 5).Should(Succeed()) + + By("Evaluate status of all service's health endpoint") + cmd = exec.Command("kubectl", "get", "pod", "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "-n", targetNamespace, "-ojsonpath={.items[*].metadata.name}") + output, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + for _, pn := range strings.Split(string(output), " ") { + verifyHealthStatusInPod(pn, targetNamespace) + } + }) + }) + + }) + var _ = Context("with platform services", func() { DescribeTable("when creating a simple workflow", func(testcaseDir string, profile metadata.ProfileType, persistenceType string) { @@ -186,6 +226,60 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() { }, Entry("and both Job Service and Data Index using the persistence from platform CR", test.GetPathFromE2EDirectory("platform", "persistence", "generic_from_platform_cr")), Entry("and both Job Service and Data Index using the one defined in each service, discarding the one from the platform CR", test.GetPathFromE2EDirectory("platform", "persistence", "overwritten_by_services")), + Entry("Job Service and Data Index come up with service based db migration", test.GetPathFromE2EDirectory("platform", "persistence", "service_based_db_migration")), + ) + + DescribeTable("when deploying a SonataFlowPlatform CR with PostgreSQL Persistence and using Job based DB migration", func(testcaseDir string) { + By("Deploy the Postgres DB") + var manifests []byte + EventuallyWithOffset(1, func() error { + var err error + cmd := exec.Command("kubectl", "kustomize", testcaseDir+"/pg-service") + manifests, err = utils.Run(cmd) + return err + }, time.Minute, time.Second).Should(Succeed()) + cmd := exec.Command("kubectl", "create", "-n", targetNamespace, "-f", "-") + cmd.Stdin = bytes.NewBuffer(manifests) + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("Wait for Postgres DB to come alive") + // wait for service deployments to be ready + EventuallyWithOffset(1, func() error { + cmd = exec.Command("kubectl", "wait", "pod", "-n", targetNamespace, "-l", "app.kubernetes.io/name in (postgres)", "--for", "condition=Ready", "--timeout=5s") + _, err = utils.Run(cmd) + return err + }, 5*time.Minute, 5).Should(Succeed()) + + By("Deploy the CR") + EventuallyWithOffset(1, func() error { + var err error + cmd := exec.Command("kubectl", "kustomize", testcaseDir+"/sonataflow-platform") + manifests, err = utils.Run(cmd) + return err + }, time.Minute, time.Second).Should(Succeed()) + cmd = exec.Command("kubectl", "create", "-n", targetNamespace, "-f", "-") + cmd.Stdin = bytes.NewBuffer(manifests) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("Wait for SonatatFlowPlatform CR to complete deployment") + // wait for service deployments to be ready + EventuallyWithOffset(1, func() error { + cmd = exec.Command("kubectl", "wait", "pod", "-n", targetNamespace, "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "--for", "condition=Ready", "--timeout=5s") + _, err = utils.Run(cmd) + return err + }, 10*time.Minute, 5).Should(Succeed()) + + By("Evaluate status of all service's health endpoint") + cmd = exec.Command("kubectl", "get", "pod", "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "-n", targetNamespace, "-ojsonpath={.items[*].metadata.name}") + output, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + for _, pn := range strings.Split(string(output), " ") { + verifyHealthStatusInPod(pn, targetNamespace) + } + }, + Entry("Job Service and Data Index come up with job based db migration", test.GetPathFromE2EDirectory("platform", "persistence", "job_based_db_migration")), ) DescribeTable("when deploying a SonataFlowPlatform CR with brokers", func(testcaseDir string) { diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr/02-sonataflow_platform.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr/02-sonataflow_platform.yaml index cac40d62b5d..c0dbfdf2051 100644 --- a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr/02-sonataflow_platform.yaml +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/generic_from_platform_cr/02-sonataflow_platform.yaml @@ -34,7 +34,7 @@ spec: dataIndex: enabled: false persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service podTemplate: initContainers: - name: init-postgres @@ -49,7 +49,7 @@ spec: jobService: enabled: false persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service podTemplate: initContainers: - name: init-postgres diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/job_based_db_migration/pg-service/01-postgres.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/job_based_db_migration/pg-service/01-postgres.yaml new file mode 100644 index 00000000000..89f6e6fc81d --- /dev/null +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/job_based_db_migration/pg-service/01-postgres.yaml @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: postgres + template: + metadata: + labels: + app.kubernetes.io/name: postgres + spec: + containers: + - name: postgres + image: postgres:13.2-alpine + imagePullPolicy: "IfNotPresent" + ports: + - containerPort: 5432 + volumeMounts: + - name: storage + mountPath: /var/lib/postgresql/data + envFrom: + - secretRef: + name: postgres-secrets + readinessProbe: + exec: + command: ["pg_isready"] + initialDelaySeconds: 15 + timeoutSeconds: 2 + livenessProbe: + exec: + command: ["pg_isready"] + initialDelaySeconds: 15 + timeoutSeconds: 2 + resources: + limits: + memory: "256Mi" + cpu: "500m" + volumes: + - name: storage + persistentVolumeClaim: + claimName: postgres-pvc +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres +spec: + selector: + app.kubernetes.io/name: postgres + ports: + - port: 5432 diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/job_based_db_migration/pg-service/kustomization.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/job_based_db_migration/pg-service/kustomization.yaml new file mode 100644 index 00000000000..eaea57c0e69 --- /dev/null +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/job_based_db_migration/pg-service/kustomization.yaml @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +resources: + - 01-postgres.yaml + +generatorOptions: + disableNameSuffixHash: true + +secretGenerator: + - name: postgres-secrets + literals: + - POSTGRES_USER=sonataflow + - POSTGRES_PASSWORD=sonataflow + - POSTGRES_DATABASE=sonataflow + - PGDATA=/var/lib/pgsql/data/userdata + +sortOptions: + order: fifo diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/job_based_db_migration/sonataflow-platform/01-sonataflow_platform.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/job_based_db_migration/sonataflow-platform/01-sonataflow_platform.yaml new file mode 100644 index 00000000000..aa234f3c64d --- /dev/null +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/job_based_db_migration/sonataflow-platform/01-sonataflow_platform.yaml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlowPlatform +metadata: + name: sonataflow-platform +spec: + build: + config: + strategyOptions: + KanikoBuildCacheEnabled: "true" + services: + dataIndex: + enabled: true + persistence: + dbMigrationStrategy: job + postgresql: + jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=data-index-service + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + jobService: + enabled: true + persistence: + dbMigrationStrategy: job + postgresql: + jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=jobs-service + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/job_based_db_migration/sonataflow-platform/kustomization.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/job_based_db_migration/sonataflow-platform/kustomization.yaml new file mode 100644 index 00000000000..601a6418c7c --- /dev/null +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/job_based_db_migration/sonataflow-platform/kustomization.yaml @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +resources: + - 01-sonataflow_platform.yaml + +generatorOptions: + disableNameSuffixHash: true diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/overwritten_by_services/02-sonataflow_platform.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/overwritten_by_services/02-sonataflow_platform.yaml index 2fa8bf05dd7..aeb44768b59 100644 --- a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/overwritten_by_services/02-sonataflow_platform.yaml +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/overwritten_by_services/02-sonataflow_platform.yaml @@ -38,7 +38,7 @@ spec: dataIndex: enabled: false persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service postgresql: jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=data-index-service secretRef: @@ -59,7 +59,7 @@ spec: jobService: enabled: false persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service postgresql: jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=jobs-service secretRef: diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/service_based_db_migration/01-postgres.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/service_based_db_migration/01-postgres.yaml new file mode 100644 index 00000000000..89f6e6fc81d --- /dev/null +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/service_based_db_migration/01-postgres.yaml @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: postgres + template: + metadata: + labels: + app.kubernetes.io/name: postgres + spec: + containers: + - name: postgres + image: postgres:13.2-alpine + imagePullPolicy: "IfNotPresent" + ports: + - containerPort: 5432 + volumeMounts: + - name: storage + mountPath: /var/lib/postgresql/data + envFrom: + - secretRef: + name: postgres-secrets + readinessProbe: + exec: + command: ["pg_isready"] + initialDelaySeconds: 15 + timeoutSeconds: 2 + livenessProbe: + exec: + command: ["pg_isready"] + initialDelaySeconds: 15 + timeoutSeconds: 2 + resources: + limits: + memory: "256Mi" + cpu: "500m" + volumes: + - name: storage + persistentVolumeClaim: + claimName: postgres-pvc +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app.kubernetes.io/name: postgres + name: postgres +spec: + selector: + app.kubernetes.io/name: postgres + ports: + - port: 5432 diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/service_based_db_migration/02-sonataflow_platform.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/service_based_db_migration/02-sonataflow_platform.yaml new file mode 100644 index 00000000000..01b77b1111f --- /dev/null +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/service_based_db_migration/02-sonataflow_platform.yaml @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlowPlatform +metadata: + name: sonataflow-platform +spec: + build: + config: + strategyOptions: + KanikoBuildCacheEnabled: "true" + services: + dataIndex: + enabled: true + persistence: + dbMigrationStrategy: service + postgresql: + jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=data-index-service + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + podTemplate: + initContainers: + - name: init-postgres + image: registry.access.redhat.com/ubi9/ubi-micro:latest + imagePullPolicy: IfNotPresent + command: + [ + "sh", + "-c", + 'until (echo 1 > /dev/tcp/postgres.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432) >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;', + ] + jobService: + enabled: true + persistence: + dbMigrationStrategy: service + postgresql: + jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=jobs-service + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + podTemplate: + initContainers: + - name: init-postgres + image: registry.access.redhat.com/ubi9/ubi-micro:latest + imagePullPolicy: IfNotPresent + command: + [ + "sh", + "-c", + 'until (echo 1 > /dev/tcp/postgres.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432) >/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;', + ] diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/service_based_db_migration/kustomization.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/service_based_db_migration/kustomization.yaml new file mode 100644 index 00000000000..48d72cbd0b8 --- /dev/null +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/persistence/service_based_db_migration/kustomization.yaml @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +resources: + - 01-postgres.yaml + - 02-sonataflow_platform.yaml + +generatorOptions: + disableNameSuffixHash: true + +secretGenerator: + - name: postgres-secrets + literals: + - POSTGRES_USER=sonataflow + - POSTGRES_PASSWORD=sonataflow + - POSTGRES_DATABASE=sonataflow + - PGDATA=/var/lib/pgsql/data/userdata + +sortOptions: + order: fifo diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/services/dev/postgreSQL/02-sonataflow_platform.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/services/dev/postgreSQL/02-sonataflow_platform.yaml index 22b909957d7..ee1942f1c70 100644 --- a/packages/sonataflow-operator/test/e2e/testdata/platform/services/dev/postgreSQL/02-sonataflow_platform.yaml +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/services/dev/postgreSQL/02-sonataflow_platform.yaml @@ -29,7 +29,7 @@ spec: dataIndex: enabled: false persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service postgresql: jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=data-index-service secretRef: @@ -50,7 +50,7 @@ spec: jobService: enabled: false persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service postgresql: jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=jobs-service secretRef: diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/services/gitops/knative/platform-level-broker/02-sonataflow_platform.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/services/gitops/knative/platform-level-broker/02-sonataflow_platform.yaml index 08d57743c97..b452ae61434 100644 --- a/packages/sonataflow-operator/test/e2e/testdata/platform/services/gitops/knative/platform-level-broker/02-sonataflow_platform.yaml +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/services/gitops/knative/platform-level-broker/02-sonataflow_platform.yaml @@ -35,7 +35,7 @@ spec: dataIndex: enabled: true persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service postgresql: jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=data-index-service secretRef: @@ -63,7 +63,7 @@ spec: jobService: enabled: true persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service postgresql: jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=jobs-service secretRef: diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/services/gitops/knative/service-level-broker/02-sonataflow_platform.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/services/gitops/knative/service-level-broker/02-sonataflow_platform.yaml index 94e21966577..6237c38335e 100644 --- a/packages/sonataflow-operator/test/e2e/testdata/platform/services/gitops/knative/service-level-broker/02-sonataflow_platform.yaml +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/services/gitops/knative/service-level-broker/02-sonataflow_platform.yaml @@ -39,7 +39,7 @@ spec: kind: Broker name: di-source persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service postgresql: jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=data-index-service secretRef: @@ -77,7 +77,7 @@ spec: kind: Broker name: js-sink persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service postgresql: jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=jobs-service secretRef: diff --git a/packages/sonataflow-operator/test/e2e/testdata/platform/services/gitops/postgreSQL/02-sonataflow_platform.yaml b/packages/sonataflow-operator/test/e2e/testdata/platform/services/gitops/postgreSQL/02-sonataflow_platform.yaml index 07ef78107ba..b11ef02df55 100644 --- a/packages/sonataflow-operator/test/e2e/testdata/platform/services/gitops/postgreSQL/02-sonataflow_platform.yaml +++ b/packages/sonataflow-operator/test/e2e/testdata/platform/services/gitops/postgreSQL/02-sonataflow_platform.yaml @@ -34,7 +34,7 @@ spec: dataIndex: enabled: true persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service postgresql: jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=data-index-service secretRef: @@ -62,7 +62,7 @@ spec: jobService: enabled: true persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service postgresql: jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=jobs-service secretRef: diff --git a/packages/sonataflow-operator/test/e2e/testdata/workflows/persistence/from_platform_with_di_and_js_services/02-sonataflow_platform.yaml b/packages/sonataflow-operator/test/e2e/testdata/workflows/persistence/from_platform_with_di_and_js_services/02-sonataflow_platform.yaml index 37b9ef95a40..8495db1ebe4 100644 --- a/packages/sonataflow-operator/test/e2e/testdata/workflows/persistence/from_platform_with_di_and_js_services/02-sonataflow_platform.yaml +++ b/packages/sonataflow-operator/test/e2e/testdata/workflows/persistence/from_platform_with_di_and_js_services/02-sonataflow_platform.yaml @@ -44,7 +44,7 @@ spec: dataIndex: enabled: true persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service podTemplate: initContainers: - name: init-postgres @@ -59,7 +59,7 @@ spec: jobService: enabled: true persistence: - migrateDBOnStartUp: true + dbMigrationStrategy: service podTemplate: initContainers: - name: init-postgres diff --git a/packages/sonataflow-operator/test/e2e/testdata/workflows/persistence/from_platform_with_no_persistence_required/03-sonataflow_callbackstatetimeouts-no-persistence.sw.yaml b/packages/sonataflow-operator/test/e2e/testdata/workflows/persistence/from_platform_with_no_persistence_required/03-sonataflow_callbackstatetimeouts-no-persistence.sw.yaml index f4f7bb0f8a5..da10f52b3ec 100644 --- a/packages/sonataflow-operator/test/e2e/testdata/workflows/persistence/from_platform_with_no_persistence_required/03-sonataflow_callbackstatetimeouts-no-persistence.sw.yaml +++ b/packages/sonataflow-operator/test/e2e/testdata/workflows/persistence/from_platform_with_no_persistence_required/03-sonataflow_callbackstatetimeouts-no-persistence.sw.yaml @@ -24,7 +24,6 @@ metadata: sonataflow.org/version: 0.0.1 sonataflow.org/profile: gitops spec: - persistence: { migrateDBOnStartUp: true } podTemplate: replicas: 0 container: diff --git a/packages/sonataflow-operator/test/testdata/db_migrator_sonataflow_platform.yaml b/packages/sonataflow-operator/test/testdata/db_migrator_sonataflow_platform.yaml new file mode 100644 index 00000000000..20cb5055687 --- /dev/null +++ b/packages/sonataflow-operator/test/testdata/db_migrator_sonataflow_platform.yaml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlowPlatform +metadata: + name: db-migration-sonataflow-platform +spec: + build: + config: + strategyOptions: + KanikoBuildCacheEnabled: "true" + services: + dataIndex: + enabled: true + persistence: + dbMigrationStrategy: job + postgresql: + jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=data-index-service + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD + jobService: + enabled: true + persistence: + dbMigrationStrategy: job + postgresql: + jdbcUrl: jdbc:postgresql://postgres:5432/sonataflow?currentSchema=jobs-service + secretRef: + name: postgres-secrets + userKey: POSTGRES_USER + passwordKey: POSTGRES_PASSWORD diff --git a/packages/sonataflow-operator/test/yaml.go b/packages/sonataflow-operator/test/yaml.go index 56139718a21..dff7aa215f9 100644 --- a/packages/sonataflow-operator/test/yaml.go +++ b/packages/sonataflow-operator/test/yaml.go @@ -61,6 +61,7 @@ const ( sonataFlowBuildSucceed = "sonataflow.org_v1alpha08_sonataflowbuild.yaml" knativeDefaultBrokerCR = "knative_default_broker.yaml" manifestsPath = "bundle/manifests/" + DBMigrationSonataFlowPlatform = "db_migrator_sonataflow_platform.yaml" ) var projectDir = "" @@ -242,6 +243,10 @@ func GetBasePlatformInReadyPhase(namespace string) *operatorapi.SonataFlowPlatfo return GetSonataFlowPlatformInReadyPhase(sonataFlowPlatformYamlCR, namespace) } +func GetBaseSonataFlowPlatformInReadyPhase(namespace string) *operatorapi.SonataFlowPlatform { + return GetSonataFlowPlatformInReadyPhase(DBMigrationSonataFlowPlatform, namespace) +} + func GetBasePlatformWithBaseImageInReadyPhase(namespace string) *operatorapi.SonataFlowPlatform { platform := GetBasePlatform() platform.Namespace = namespace