diff --git a/.github/ISSUE_TEMPLATE/bug-report.yml b/.github/ISSUE_TEMPLATE/bug-report.yml index 845ce7b1d..10fdbeec2 100644 --- a/.github/ISSUE_TEMPLATE/bug-report.yml +++ b/.github/ISSUE_TEMPLATE/bug-report.yml @@ -40,7 +40,7 @@ body: - type: textarea attributes: label: Scaleph Version or Branch - description: Provide SeaTunnel version or branch. + description: Provide Scaleph version or branch. placeholder: > Please provide the version or branch of Scaleph. validations: diff --git a/.github/workflows/ci-maven.yml b/.github/workflows/ci-maven.yml index 01fc1b6a9..33d6512e8 100644 --- a/.github/workflows/ci-maven.yml +++ b/.github/workflows/ci-maven.yml @@ -39,12 +39,9 @@ jobs: java-version: ${{ matrix.jdk }} distribution: temurin cache: maven - - name: Build scaleph-dist - timeout-minutes: 360 - run: mvn -B -U -T 4C clean package -Pdist -DskipTests -Dfast - name: Build with Maven timeout-minutes: 360 - run: mvn -B -U -T 4C clean package + run: mvn -B -U -T 4C clean package -Pdist -Dfast - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 with: diff --git a/scaleph-application/scaleph-application-doris/src/main/java/cn/sliew/scaleph/application/doris/action/DorisOperatorInstanceStatusSyncJob.java b/scaleph-application/scaleph-application-doris/src/main/java/cn/sliew/scaleph/application/doris/action/DorisOperatorInstanceStatusSyncJob.java index 3fed93e47..dc9c8973b 100644 --- a/scaleph-application/scaleph-application-doris/src/main/java/cn/sliew/scaleph/application/doris/action/DorisOperatorInstanceStatusSyncJob.java +++ b/scaleph-application/scaleph-application-doris/src/main/java/cn/sliew/scaleph/application/doris/action/DorisOperatorInstanceStatusSyncJob.java @@ -24,6 +24,8 @@ import cn.sliew.scaleph.application.doris.operator.status.DorisClusterStatus; import cn.sliew.scaleph.workflow.engine.action.ActionContext; import cn.sliew.scaleph.workflow.engine.action.ActionResult; +import cn.sliew.scaleph.workflow.engine.action.ActionStatus; +import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult; import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow; import io.fabric8.kubernetes.api.model.GenericKubernetesResource; import lombok.extern.slf4j.Slf4j; @@ -46,13 +48,14 @@ public DorisOperatorInstanceStatusSyncJob() { @Override protected Runnable doExecute(ActionContext context, ActionListener listener) { - return () -> process(); + return () -> process(context, listener); } - private void process() { + private void process(ActionContext context, ActionListener listener) { List ids = wsDorisOperatorInstanceService.listAll(); ids.forEach(this::doProcess); log.debug("update doris operator instance status success! update size: {}", ids.size()); + listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context)); } private void doProcess(Long id) { diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJob.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJob.java index 02a109bf7..017133c9f 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJob.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJob.java @@ -28,6 +28,8 @@ import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind; import cn.sliew.scaleph.workflow.engine.action.ActionContext; import cn.sliew.scaleph.workflow.engine.action.ActionResult; +import cn.sliew.scaleph.workflow.engine.action.ActionStatus; +import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult; import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow; import io.fabric8.kubernetes.api.model.GenericKubernetesResource; import lombok.extern.slf4j.Slf4j; @@ -52,13 +54,14 @@ public FlinkJobStatusSyncJob() { @Override protected Runnable doExecute(ActionContext context, ActionListener listener) { - return () -> process(); + return () -> process(context, listener); } - private void process() { + private void process(ActionContext context, ActionListener listener) { List jobIds = wsFlinkKubernetesJobService.listAll(); jobIds.forEach(this::doProcess); log.debug("update flink kubernetes job status success! update size: {}", jobIds.size()); + listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context)); } private void doProcess(Long jobId) { diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepOne.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepOne.java index ae41b00fa..a6bb9c0ff 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepOne.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepOne.java @@ -19,12 +19,17 @@ package cn.sliew.scaleph.application.flink.action; import cn.sliew.milky.common.filter.ActionListener; +import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.workflow.engine.action.ActionContext; import cn.sliew.scaleph.workflow.engine.action.ActionResult; +import cn.sliew.scaleph.workflow.engine.action.ActionStatus; +import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult; import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.Map; + @Slf4j @Component public class FlinkJobStatusSyncJobStepOne extends AbstractWorkFlow { @@ -35,11 +40,15 @@ public FlinkJobStatusSyncJobStepOne() { @Override protected Runnable doExecute(ActionContext context, ActionListener listener) { - return () -> process(); + return () -> process(context, listener); } - private void process() { - log.info("update flink kubernetes job status step-1"); + private void process(ActionContext context, ActionListener listener) { + log.info("update flink kubernetes job status step-1, globalInputs: {}, inputs: {}", + JacksonUtil.toJsonString(context.getGlobalInputs()), JacksonUtil.toJsonString(context.getInputs())); + Map outputs = context.getOutputs(); + outputs.put("output1", "value1"); + listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context)); } } diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepThreeOne.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepThreeOne.java index d68aa8685..dec9543f7 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepThreeOne.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepThreeOne.java @@ -19,12 +19,17 @@ package cn.sliew.scaleph.application.flink.action; import cn.sliew.milky.common.filter.ActionListener; +import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.workflow.engine.action.ActionContext; import cn.sliew.scaleph.workflow.engine.action.ActionResult; +import cn.sliew.scaleph.workflow.engine.action.ActionStatus; +import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult; import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.Map; + @Slf4j @Component public class FlinkJobStatusSyncJobStepThreeOne extends AbstractWorkFlow { @@ -35,11 +40,15 @@ public FlinkJobStatusSyncJobStepThreeOne() { @Override protected Runnable doExecute(ActionContext context, ActionListener listener) { - return () -> process(); + return () -> process(context, listener); } - private void process() { - log.info("update flink kubernetes job status step-3-1"); + private void process(ActionContext context, ActionListener listener) { + log.info("update flink kubernetes job status step-3-1, globalInputs: {}, inputs: {}", + JacksonUtil.toJsonString(context.getGlobalInputs()), JacksonUtil.toJsonString(context.getInputs())); + Map outputs = context.getOutputs(); + outputs.put("output3-1", "value3-1"); + listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context)); } } diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepThreeTwo.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepThreeTwo.java index e26328161..a5adcd486 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepThreeTwo.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepThreeTwo.java @@ -19,12 +19,17 @@ package cn.sliew.scaleph.application.flink.action; import cn.sliew.milky.common.filter.ActionListener; +import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.workflow.engine.action.ActionContext; import cn.sliew.scaleph.workflow.engine.action.ActionResult; +import cn.sliew.scaleph.workflow.engine.action.ActionStatus; +import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult; import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.Map; + @Slf4j @Component public class FlinkJobStatusSyncJobStepThreeTwo extends AbstractWorkFlow { @@ -35,11 +40,15 @@ public FlinkJobStatusSyncJobStepThreeTwo() { @Override protected Runnable doExecute(ActionContext context, ActionListener listener) { - return () -> process(); + return () -> process(context, listener); } - private void process() { - log.info("update flink kubernetes job status step-3-2"); + private void process(ActionContext context, ActionListener listener) { + log.info("update flink kubernetes job status step-3-2, globalInputs: {}, inputs: {}", + JacksonUtil.toJsonString(context.getGlobalInputs()), JacksonUtil.toJsonString(context.getInputs())); + Map outputs = context.getOutputs(); + outputs.put("output3-2", "value3-2"); + listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context)); } } diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepTwo.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepTwo.java index 52f61ba71..6dd519756 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepTwo.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkJobStatusSyncJobStepTwo.java @@ -19,12 +19,17 @@ package cn.sliew.scaleph.application.flink.action; import cn.sliew.milky.common.filter.ActionListener; +import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.workflow.engine.action.ActionContext; import cn.sliew.scaleph.workflow.engine.action.ActionResult; +import cn.sliew.scaleph.workflow.engine.action.ActionStatus; +import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult; import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.Map; + @Slf4j @Component public class FlinkJobStatusSyncJobStepTwo extends AbstractWorkFlow { @@ -35,11 +40,15 @@ public FlinkJobStatusSyncJobStepTwo() { @Override protected Runnable doExecute(ActionContext context, ActionListener listener) { - return () -> process(); + return () -> process(context, listener); } - private void process() { - log.info("update flink kubernetes job status step-2"); + private void process(ActionContext context, ActionListener listener) { + log.info("update flink kubernetes job status step-2, globalInputs: {}, inputs: {}", + JacksonUtil.toJsonString(context.getGlobalInputs()), JacksonUtil.toJsonString(context.getInputs())); + Map outputs = context.getOutputs(); + outputs.put("output2", "value2"); + listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context)); } } diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkSessionClusterStatusSyncJob.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkSessionClusterStatusSyncJob.java index 2148917db..5210b7667 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkSessionClusterStatusSyncJob.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/action/FlinkSessionClusterStatusSyncJob.java @@ -24,6 +24,8 @@ import cn.sliew.scaleph.application.flink.service.WsFlinkKubernetesSessionClusterService; import cn.sliew.scaleph.workflow.engine.action.ActionContext; import cn.sliew.scaleph.workflow.engine.action.ActionResult; +import cn.sliew.scaleph.workflow.engine.action.ActionStatus; +import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult; import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow; import io.fabric8.kubernetes.api.model.GenericKubernetesResource; import lombok.extern.slf4j.Slf4j; @@ -46,13 +48,14 @@ public FlinkSessionClusterStatusSyncJob() { @Override protected Runnable doExecute(ActionContext context, ActionListener listener) { - return () -> process(); + return () -> process(context, listener); } - private void process() { + private void process(ActionContext context, ActionListener listener) { List sessionClusterIds = wsFlinkKubernetesSessionClusterService.listAll(); sessionClusterIds.forEach(this::doProcess); log.debug("update flink kubernetes session-cluster status success! update size: {}", sessionClusterIds.size()); + listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context)); } private void doProcess(Long sessionClusterId) { diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/job/instance/FlinkDeploymentSpecHandler.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/job/instance/FlinkDeploymentSpecHandler.java index ac4495dad..67575e4ec 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/job/instance/FlinkDeploymentSpecHandler.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/definition/job/instance/FlinkDeploymentSpecHandler.java @@ -19,9 +19,9 @@ package cn.sliew.scaleph.application.flink.resource.definition.job.instance; import cn.sliew.scaleph.application.flink.operator.spec.*; -import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger; import cn.sliew.scaleph.application.flink.resource.handler.*; import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO; +import cn.sliew.scaleph.common.jackson.JsonMerger; import org.apache.commons.lang3.EnumUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -104,9 +104,9 @@ private void customFlinkMainContainer(WsFlinkKubernetesJobInstanceDTO jobInstanc } private void mergeJobInstance(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) { - spec.setJobManager(TemplateMerger.merge(spec.getJobManager(), jobInstanceDTO.getJobManager(), JobManagerSpec.class)); - spec.setTaskManager(TemplateMerger.merge(spec.getTaskManager(), jobInstanceDTO.getTaskManager(), TaskManagerSpec.class)); - spec.setFlinkConfiguration(TemplateMerger.merge(spec.getFlinkConfiguration(), jobInstanceDTO.getUserFlinkConfiguration(), Map.class)); + spec.setJobManager(JsonMerger.merge(spec.getJobManager(), jobInstanceDTO.getJobManager(), JobManagerSpec.class)); + spec.setTaskManager(JsonMerger.merge(spec.getTaskManager(), jobInstanceDTO.getTaskManager(), TaskManagerSpec.class)); + spec.setFlinkConfiguration(JsonMerger.merge(spec.getFlinkConfiguration(), jobInstanceDTO.getUserFlinkConfiguration(), Map.class)); JobSpec job = spec.getJob(); if (jobInstanceDTO.getParallelism() != null) { job.setParallelism(jobInstanceDTO.getParallelism()); diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/LoggingHandler.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/LoggingHandler.java index 81b862e06..80ba8d6a9 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/LoggingHandler.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/LoggingHandler.java @@ -20,7 +20,7 @@ import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec; import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionClusterSpec; -import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger; +import cn.sliew.scaleph.common.jackson.JsonMerger; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; @@ -33,7 +33,7 @@ public class LoggingHandler { public void handle(Map logConfiguration, FlinkDeploymentSpec spec) { Map configuration = Optional.ofNullable(spec.getFlinkConfiguration()).orElse(new HashMap<>()); - Map merge = TemplateMerger.merge(configuration, logConfiguration, Map.class); + Map merge = JsonMerger.merge(configuration, logConfiguration, Map.class); if (CollectionUtils.isEmpty(merge) == false) { spec.setLogConfiguration(null); } else { @@ -43,7 +43,7 @@ public void handle(Map logConfiguration, FlinkDeploymentSpec spe public void handle(Map logConfiguration, FlinkSessionClusterSpec spec) { Map configuration = Optional.ofNullable(spec.getFlinkConfiguration()).orElse(new HashMap<>()); - Map merge = TemplateMerger.merge(configuration, logConfiguration, Map.class); + Map merge = JsonMerger.merge(configuration, logConfiguration, Map.class); if (CollectionUtils.isEmpty(merge) == false) { spec.setLogConfiguration(null); } else { diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/PodTemplateHandler.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/PodTemplateHandler.java index 03e48e8fc..7200eb315 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/PodTemplateHandler.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/resource/handler/PodTemplateHandler.java @@ -20,9 +20,9 @@ import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec; import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionClusterSpec; -import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger; import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO; import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO; +import cn.sliew.scaleph.common.jackson.JsonMerger; import io.fabric8.kubernetes.api.model.Pod; import org.springframework.stereotype.Component; @@ -30,12 +30,12 @@ public class PodTemplateHandler { public void handle(WsFlinkKubernetesJobDTO jobDTO, FlinkDeploymentSpec spec) { - Pod merge = TemplateMerger.merge(spec.getPodTemplate(), jobDTO.getFlinkDeployment().getPodTemplate(), Pod.class); + Pod merge = JsonMerger.merge(spec.getPodTemplate(), jobDTO.getFlinkDeployment().getPodTemplate(), Pod.class); spec.setPodTemplate(merge); } public void handle(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) { - Pod merge = TemplateMerger.merge(spec.getPodTemplate(), sessionClusterDTO.getPodTemplate(), Pod.class); + Pod merge = JsonMerger.merge(spec.getPodTemplate(), sessionClusterDTO.getPodTemplate(), Pod.class); spec.setPodTemplate(merge); } } diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java index 33089d32c..66f422976 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java @@ -24,7 +24,6 @@ import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionJobSpec; import cn.sliew.scaleph.application.flink.operator.spec.JobState; import cn.sliew.scaleph.application.flink.operator.status.*; -import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger; import cn.sliew.scaleph.application.flink.resource.definition.job.instance.FlinkJobInstanceConverterFactory; import cn.sliew.scaleph.application.flink.resource.definition.job.instance.MetadataHandler; import cn.sliew.scaleph.application.flink.service.FlinkKubernetesOperatorService; @@ -44,6 +43,7 @@ import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointFormatType; import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointTriggerType; +import cn.sliew.scaleph.common.jackson.JsonMerger; import cn.sliew.scaleph.common.util.UUIDUtil; import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstance; import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstanceSavepoint; @@ -168,7 +168,7 @@ public void deploy(WsFlinkKubernetesJobInstanceDeployParam param) throws Excepti case FLINK_DEPLOYMENT: clusterCredentialId = jobDTO.getFlinkDeployment().getClusterCredentialId(); Map flinkConfiguration = jobDTO.getFlinkDeployment().getFlinkConfiguration(); - Map mergedFlinkConfiguration = TemplateMerger.merge(flinkConfiguration, param.getUserFlinkConfiguration(), Map.class); + Map mergedFlinkConfiguration = JsonMerger.merge(flinkConfiguration, param.getUserFlinkConfiguration(), Map.class); record.setMergedFlinkConfiguration(JacksonUtil.toJsonString(mergedFlinkConfiguration)); resource = Constant.FLINK_DEPLOYMENT; callbackHandler = flinkDeploymentWatchCallbackHandler; diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesTemplateServiceImpl.java b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesTemplateServiceImpl.java index 579fd58f2..779f7d23b 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesTemplateServiceImpl.java +++ b/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/service/impl/WsFlinkKubernetesTemplateServiceImpl.java @@ -19,16 +19,9 @@ package cn.sliew.scaleph.application.flink.service.impl; import cn.sliew.scaleph.application.flink.factory.FlinkDefaultTemplateFactory; -import cn.sliew.scaleph.common.dict.flink.FlinkJobType; -import cn.sliew.scaleph.common.dict.flink.FlinkVersion; -import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind; -import cn.sliew.scaleph.common.util.UUIDUtil; -import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesTemplate; -import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesTemplateMapper; import cn.sliew.scaleph.application.flink.operator.spec.IngressSpec; import cn.sliew.scaleph.application.flink.operator.spec.JobManagerSpec; import cn.sliew.scaleph.application.flink.operator.spec.TaskManagerSpec; -import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger; import cn.sliew.scaleph.application.flink.resource.definition.template.FlinkTemplate; import cn.sliew.scaleph.application.flink.resource.definition.template.FlinkTemplateConverter; import cn.sliew.scaleph.application.flink.resource.handler.FlinkImageMapping; @@ -43,6 +36,13 @@ import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesTemplateListParam; import cn.sliew.scaleph.application.flink.service.param.WsFlinkKubernetesTemplateUpdateParam; import cn.sliew.scaleph.application.flink.service.vo.KubernetesOptionsVO; +import cn.sliew.scaleph.common.dict.flink.FlinkJobType; +import cn.sliew.scaleph.common.dict.flink.FlinkVersion; +import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind; +import cn.sliew.scaleph.common.jackson.JsonMerger; +import cn.sliew.scaleph.common.util.UUIDUtil; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesTemplate; +import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesTemplateMapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import io.fabric8.kubernetes.api.model.Pod; @@ -140,14 +140,14 @@ private WsFlinkKubernetesTemplateDTO doMergeDefault(WsFlinkKubernetesTemplateDTO WsFlinkKubernetesTemplateDTO result = new WsFlinkKubernetesTemplateDTO(); result.setName(dto.getName()); result.setNamespace(StringUtils.hasText(dto.getNamespace()) ? dto.getNamespace() : globalDefault.getNamespace()); - result.setKubernetesOptions(TemplateMerger.merge(globalDefault.getKubernetesOptions(), dto.getKubernetesOptions(), KubernetesOptionsVO.class)); - result.setJobManager(TemplateMerger.merge(globalDefault.getJobManager(), dto.getJobManager(), JobManagerSpec.class)); - result.setTaskManager(TemplateMerger.merge(globalDefault.getTaskManager(), dto.getTaskManager(), TaskManagerSpec.class)); - result.setPodTemplate(TemplateMerger.merge(globalDefault.getPodTemplate(), dto.getPodTemplate(), Pod.class)); - result.setFlinkConfiguration(TemplateMerger.merge(globalDefault.getFlinkConfiguration(), dto.getFlinkConfiguration(), Map.class)); - result.setLogConfiguration(TemplateMerger.merge(globalDefault.getLogConfiguration(), dto.getLogConfiguration(), Map.class)); - result.setIngress(TemplateMerger.merge(globalDefault.getIngress(), dto.getIngress(), IngressSpec.class)); - result.setAdditionalDependencies(TemplateMerger.merge(globalDefault.getAdditionalDependencies(), dto.getAdditionalDependencies(), List.class)); + result.setKubernetesOptions(JsonMerger.merge(globalDefault.getKubernetesOptions(), dto.getKubernetesOptions(), KubernetesOptionsVO.class)); + result.setJobManager(JsonMerger.merge(globalDefault.getJobManager(), dto.getJobManager(), JobManagerSpec.class)); + result.setTaskManager(JsonMerger.merge(globalDefault.getTaskManager(), dto.getTaskManager(), TaskManagerSpec.class)); + result.setPodTemplate(JsonMerger.merge(globalDefault.getPodTemplate(), dto.getPodTemplate(), Pod.class)); + result.setFlinkConfiguration(JsonMerger.merge(globalDefault.getFlinkConfiguration(), dto.getFlinkConfiguration(), Map.class)); + result.setLogConfiguration(JsonMerger.merge(globalDefault.getLogConfiguration(), dto.getLogConfiguration(), Map.class)); + result.setIngress(JsonMerger.merge(globalDefault.getIngress(), dto.getIngress(), IngressSpec.class)); + result.setAdditionalDependencies(JsonMerger.merge(globalDefault.getAdditionalDependencies(), dto.getAdditionalDependencies(), List.class)); result.setRemark(StringUtils.hasText(dto.getRemark()) ? dto.getRemark() : globalDefault.getRemark()); return result; } diff --git a/scaleph-common/pom.xml b/scaleph-common/pom.xml index e6830fd2b..639bb2880 100644 --- a/scaleph-common/pom.xml +++ b/scaleph-common/pom.xml @@ -81,6 +81,11 @@ provided + + com.flipkart.zjsonpatch + zjsonpatch + + com.lmax disruptor diff --git a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/operator/util/TemplateMerger.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/jackson/JsonMerger.java similarity index 90% rename from scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/operator/util/TemplateMerger.java rename to scaleph-common/src/main/java/cn/sliew/scaleph/common/jackson/JsonMerger.java index 71aafd469..a6b7c9d51 100644 --- a/scaleph-application/scaleph-application-flink/src/main/java/cn/sliew/scaleph/application/flink/operator/util/TemplateMerger.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/jackson/JsonMerger.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package cn.sliew.scaleph.application.flink.operator.util; +package cn.sliew.scaleph.common.jackson; import cn.sliew.milky.common.util.JacksonUtil; import com.fasterxml.jackson.databind.JsonNode; @@ -28,7 +28,7 @@ import java.util.EnumSet; -public enum TemplateMerger { +public enum JsonMerger { ; public static T merge(T template, T target, Class clazz) { @@ -38,11 +38,11 @@ public static T merge(T template, T target, Class clazz) { return JacksonUtil.toObject(merged, clazz); } - private static JsonNode doMerge(JsonNode source, JsonNode target) { - if (source.isNull()) { + public static JsonNode doMerge(JsonNode source, JsonNode target) { + if (source == null || source.isNull()) { return target; } - if (target.isNull()) { + if (target == null || target.isNull()) { return source; } EnumSet flags = DiffFlags.dontNormalizeOpIntoMoveAndCopy().clone(); diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionContext.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionContext.java index 14ab2c0f7..7d94d0af7 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionContext.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionContext.java @@ -32,6 +32,7 @@ import java.util.Collection; import java.util.Date; +import java.util.HashMap; import java.util.Map; @Data @@ -45,7 +46,9 @@ public class ActionContext implements AttributeMap { private Long workflowTaskInstanceId; - private Map params; + private Map globalInputs; + private Map inputs; + private Map outputs = new HashMap<>(); @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date previousFireTime; diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionContextBuilder.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionContextBuilder.java index f9c0e73a6..461762a05 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionContextBuilder.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/action/ActionContextBuilder.java @@ -70,8 +70,14 @@ public ActionContextBuilder withFireTime(Date fireTime) { return this; } - public ActionContextBuilder withParams(Map params) { - context.setParams(params); + public ActionContextBuilder withGlobalInputs(Map globalInputs) { + context.setGlobalInputs(globalInputs); + return this; + } + + + public ActionContextBuilder withInputs(Map inputs) { + context.setInputs(inputs); return this; } diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/ParallelFlow.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/ParallelFlow.java index 7c5a55a62..beb1e5da7 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/ParallelFlow.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/ParallelFlow.java @@ -54,6 +54,7 @@ public void onFailure(Throwable e) { } }); } + // fixme listener.onResponse(new ParallelActionResult(context, results)); } catch (Exception e) { listener.onFailure(e); diff --git a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/SequentialFlow.java b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/SequentialFlow.java index b75477c6d..276bf3a6c 100644 --- a/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/SequentialFlow.java +++ b/scaleph-workflow/scaleph-workflow-api/src/main/java/cn/sliew/scaleph/workflow/engine/workflow/SequentialFlow.java @@ -19,9 +19,7 @@ package cn.sliew.scaleph.workflow.engine.workflow; import cn.sliew.milky.common.filter.ActionListener; -import cn.sliew.scaleph.workflow.engine.action.Action; -import cn.sliew.scaleph.workflow.engine.action.ActionContext; -import cn.sliew.scaleph.workflow.engine.action.ActionResult; +import cn.sliew.scaleph.workflow.engine.action.*; import org.apache.commons.lang3.RandomStringUtils; import java.util.ArrayList; @@ -53,7 +51,7 @@ private void sequentialExecute(int index, ActionContext context, ActionListener< action.execute(context, new ActionListener() { @Override public void onResponse(ActionResult result) { - if (index == actions.size()) { + if (index + 1 == actions.size()) { listener.onResponse(result); } else { sequentialExecute(index + 1, context, listener); diff --git a/scaleph-workflow/scaleph-workflow-simple/src/main/java/cn/sliew/scaleph/workflow/simple/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java b/scaleph-workflow/scaleph-workflow-simple/src/main/java/cn/sliew/scaleph/workflow/simple/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java index 4f0a23b92..f6281d677 100644 --- a/scaleph-workflow/scaleph-workflow-simple/src/main/java/cn/sliew/scaleph/workflow/simple/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java +++ b/scaleph-workflow/scaleph-workflow-simple/src/main/java/cn/sliew/scaleph/workflow/simple/listener/taskinstance/WorkflowTaskInstanceDeployEventListener.java @@ -21,10 +21,13 @@ import cn.sliew.milky.common.exception.Rethrower; import cn.sliew.milky.common.filter.ActionListener; import cn.sliew.milky.common.util.JacksonUtil; +import cn.sliew.scaleph.common.jackson.JsonMerger; import cn.sliew.scaleph.common.util.SpringApplicationContextUtil; import cn.sliew.scaleph.dag.service.DagConfigStepService; +import cn.sliew.scaleph.dag.service.DagInstanceComplexService; import cn.sliew.scaleph.dag.service.DagStepService; import cn.sliew.scaleph.dag.service.dto.DagConfigStepDTO; +import cn.sliew.scaleph.dag.service.dto.DagInstanceDTO; import cn.sliew.scaleph.dag.service.dto.DagStepDTO; import cn.sliew.scaleph.queue.MessageListener; import cn.sliew.scaleph.workflow.engine.Engine; @@ -34,16 +37,20 @@ import cn.sliew.scaleph.workflow.engine.action.ActionContextBuilder; import cn.sliew.scaleph.workflow.engine.action.ActionResult; import cn.sliew.scaleph.workflow.engine.workflow.ParallelFlow; +import cn.sliew.scaleph.workflow.engine.workflow.SequentialFlow; import cn.sliew.scaleph.workflow.engine.workflow.WorkFlow; import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskDefinitionMeta; import cn.sliew.scaleph.workflow.simple.statemachine.WorkflowTaskInstanceStateMachine; +import com.fasterxml.jackson.databind.JsonNode; import lombok.extern.slf4j.Slf4j; import org.redisson.api.annotation.RInject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.ClassUtils; import java.io.Serializable; +import java.util.Collections; import java.util.Date; +import java.util.Map; import java.util.concurrent.CompletableFuture; @MessageListener(topic = WorkflowTaskInstanceDeployEventListener.TOPIC, consumerGroup = WorkflowTaskInstanceStateMachine.CONSUMER_GROUP) @@ -59,8 +66,6 @@ protected CompletableFuture handleEventAsync(WorkflowTaskInstanceEventDTO event) future.whenCompleteAsync((unused, throwable) -> { if (throwable != null) { onFailure(event.getWorkflowTaskInstanceId(), throwable); - } else { - stateMachine.onSuccess(dagStepService.selectOne(event.getWorkflowTaskInstanceId())); } }); return future; @@ -74,9 +79,13 @@ public static class DeployRunner implements Runnable, Serializable { @RInject private String taskId; @Autowired + private DagInstanceComplexService dagInstanceComplexService; + @Autowired private DagConfigStepService dagConfigStepService; @Autowired private DagStepService dagStepService; + @Autowired + protected WorkflowTaskInstanceStateMachine stateMachine; public DeployRunner(WorkflowTaskInstanceEventDTO event) { this.event = event; @@ -92,25 +101,41 @@ public void run() { dagStepService.update(dagStepUpdateParam); DagStepDTO stepDTO = dagStepService.selectOne(event.getWorkflowTaskInstanceId()); + DagInstanceDTO dagInstanceDTO = dagInstanceComplexService.selectSimpleOne(stepDTO.getDagInstanceId()); DagConfigStepDTO configStepDTO = dagConfigStepService.selectOne(stepDTO.getDagConfigStep().getId()); WorkflowTaskDefinitionMeta workflowTaskDefinitionMeta = JacksonUtil.toObject(configStepDTO.getStepMeta(), WorkflowTaskDefinitionMeta.class); try { Class clazz = ClassUtils.forName(workflowTaskDefinitionMeta.getHandler(), ClassUtils.getDefaultClassLoader()); Action action = (Action) SpringApplicationContextUtil.getBean(clazz); - WorkFlow workFlow = ParallelFlow.newParallelFlow() + WorkFlow workFlow = SequentialFlow.newSequentialFlow() .name(configStepDTO.getStepName()) .execute(action) .build(); - ActionContext actionContext = buildActionContext(stepDTO); + ActionContext actionContext = buildActionContext(dagInstanceDTO, stepDTO); engine.run(workFlow, actionContext, new ActionListener() { @Override public void onResponse(ActionResult result) { - log.debug("workflow task {} run success!", configStepDTO.getStepName()); + try { + ActionContext context = result.getContext(); + log.debug("workflow task {} run success!, globalInputs: {}, inputs: {}, outputs: {}", + configStepDTO.getStepName(), JacksonUtil.toJsonString(context.getGlobalInputs()), JacksonUtil.toJsonString(context.getInputs()), JacksonUtil.toJsonString(context.getOutputs())); + // 记录输出 + DagStepDTO dagStepSuccessParam = new DagStepDTO(); + dagStepSuccessParam.setId(event.getWorkflowTaskInstanceId()); + dagStepSuccessParam.setOutputs(JacksonUtil.toJsonNode(context.getOutputs())); + dagStepService.update(dagStepSuccessParam); + // 通知成功 + stateMachine.onSuccess(dagStepService.selectOne(event.getWorkflowTaskInstanceId())); + } catch (Exception e) { + onFailure(e); + } } @Override public void onFailure(Throwable e) { log.error("workflow task {} run failure!", configStepDTO.getStepName(), e); + // 通知失败 + stateMachine.onFailure(dagStepService.selectOne(event.getWorkflowTaskInstanceId()), e); } }); } catch (ClassNotFoundException e) { @@ -118,11 +143,22 @@ public void onFailure(Throwable e) { } } - private ActionContext buildActionContext(DagStepDTO stepDTO) { + private ActionContext buildActionContext(DagInstanceDTO dagInstanceDTO, DagStepDTO stepDTO) { + Map globalInputs = Collections.emptyMap(); + if (dagInstanceDTO.getInputs() != null && dagInstanceDTO.getInputs().isObject()) { + globalInputs = JacksonUtil.toMap(dagInstanceDTO.getInputs()); + } + Map inputs = Collections.emptyMap(); + if (stepDTO.getInputs() != null && stepDTO.getInputs().isObject()) { + inputs = JacksonUtil.toMap(stepDTO.getInputs()); + } return ActionContextBuilder.newBuilder() + .withWorkflowDefinitionId(dagInstanceDTO.getDagConfig().getId()) .withWorkflowInstanceId(stepDTO.getDagInstanceId()) .withWorkflowTaskDefinitionId(stepDTO.getDagConfigStep().getId()) .withWorkflowTaskInstanceId(stepDTO.getId()) + .withGlobalInputs(globalInputs) + .withInputs(inputs) .validateAndBuild(); } } diff --git a/tools/docker/mysql/init.d/scaleph-dag-mysql.sql b/tools/docker/mysql/init.d/scaleph-dag-mysql.sql index b293d0a6a..6887b3fc6 100644 --- a/tools/docker/mysql/init.d/scaleph-dag-mysql.sql +++ b/tools/docker/mysql/init.d/scaleph-dag-mysql.sql @@ -36,7 +36,7 @@ INSERT INTO `dag_config`(`id`, `type`, `name`, `config_id`, `dag_meta`, `dag_att VALUES (6, 'WorkFlow', 'DorisOperatorInstanceStatusSyncJob', 'kepa00f4fdb5e8794cbb931067244caf5ef2', NULL, NULL, NULL, 'sys', 'sys'); INSERT INTO `dag_config`(`id`, `type`, `name`, `config_id`, `dag_meta`, `dag_attrs`, `remark`, `creator`, `editor`) -VALUES (7, 'WorkFlow', 'Demo', 'fssxbe099903bf174c11bf64b0d486383784', NULL, NULL, NULL, 'sys', 'sys'); +VALUES (7, 'WorkFlow', 'Demo', 'fssxbe099903bf174c11bf64b0d486383784', NULL, '{"foo":"bar"}', NULL, 'sys', 'sys'); INSERT INTO `dag_config`(`id`, `type`, `name`, `config_id`, `dag_meta`, `dag_attrs`, `intput_options`, `output_options`, `version`, `remark`, `creator`, `editor`) VALUES (8, 'SeaTunnel', 'mysql_binlog_kafka_es', 'zzbk202837c4529d47d2ab09fa7ccf84fd81', NULL, NULL, NULL, NULL, 0, @@ -150,22 +150,22 @@ INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position `step_meta`, `step_attrs`, `creator`, `editor`) VALUES (10, 7, 'cae1a622-6c96-4cec-81d3-883510c17702', 'FlinkJobStatus-1', 460, 400, NULL, NULL, '{\"handler\":\"cn.sliew.scaleph.application.flink.action.FlinkJobStatusSyncJobStepOne\",\"type\":\"1\"}', - NULL, 'sys', 'sys'); + '{"key1":"value1"}', 'sys', 'sys'); INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`, `step_meta`, `step_attrs`, `creator`, `editor`) VALUES (11, 7, '2c2cb6c8-794b-4cc1-8258-cd1898912744', 'FlinkJobStatus-2', 460, 400, NULL, NULL, '{\"handler\":\"cn.sliew.scaleph.application.flink.action.FlinkJobStatusSyncJobStepTwo\",\"type\":\"1\"}', - NULL, 'sys', 'sys'); + '{"key2":"value2"}', 'sys', 'sys'); INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`, `step_meta`, `step_attrs`, `creator`, `editor`) VALUES (12, 7, 'd82a947b-f414-4273-973a-06f20fe33f0d', 'FlinkJobStatus-3-1', 460, 400, NULL, NULL, '{\"handler\":\"cn.sliew.scaleph.application.flink.action.FlinkJobStatusSyncJobStepThreeOne\",\"type\":\"1\"}', - NULL, 'sys', 'sys'); + '{"key3-1":"value3-1"}', 'sys', 'sys'); INSERT INTO `dag_config_step` (`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`, `step_meta`, `step_attrs`, `creator`, `editor`) VALUES (13, 7, '027db10b-9150-403d-9d11-e4a36c99e1db', 'FlinkJobStatus-3-2', 460, 400, NULL, NULL, '{\"handler\":\"cn.sliew.scaleph.application.flink.action.FlinkJobStatusSyncJobStepThreeTwo\",\"type\":\"1\"}', - NULL, 'sys', 'sys'); + '{"key3-2":"value3-2"}', 'sys', 'sys'); INSERT INTO `dag_config_step`(`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`, `step_meta`, `step_attrs`, `creator`, `editor`) VALUES (14, 8, 'cfddc076-db37-41b1-a0f5-26430184805d', 'Kafka Source', 640, 160, NULL, NULL,