Skip to content

Commit

Permalink
[Feature][scaleph-workflow] update scaleph workflow step inputs and o…
Browse files Browse the repository at this point in the history
…utputs (#736)

* feature: update flink session job handler

* feature: update flink session job handler

* feature: update dag inputs

* feature: update dag config and step inputs

* feature: update dag config and step outputs

* feature: update dag config and step outputs
  • Loading branch information
kalencaya authored Jun 10, 2024
1 parent 2329f33 commit 2d5c610
Show file tree
Hide file tree
Showing 22 changed files with 164 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/bug-report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ body:
- type: textarea
attributes:
label: Scaleph Version or Branch
description: Provide SeaTunnel version or branch.
description: Provide Scaleph version or branch.
placeholder: >
Please provide the version or branch of Scaleph.
validations:
Expand Down
5 changes: 1 addition & 4 deletions .github/workflows/ci-maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,9 @@ jobs:
java-version: ${{ matrix.jdk }}
distribution: temurin
cache: maven
- name: Build scaleph-dist
timeout-minutes: 360
run: mvn -B -U -T 4C clean package -Pdist -DskipTests -Dfast
- name: Build with Maven
timeout-minutes: 360
run: mvn -B -U -T 4C clean package
run: mvn -B -U -T 4C clean package -Pdist -Dfast
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import cn.sliew.scaleph.application.doris.operator.status.DorisClusterStatus;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.action.ActionStatus;
import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -46,13 +48,14 @@ public DorisOperatorInstanceStatusSyncJob() {

@Override
protected Runnable doExecute(ActionContext context, ActionListener<ActionResult> listener) {
return () -> process();
return () -> process(context, listener);
}

private void process() {
private void process(ActionContext context, ActionListener<ActionResult> listener) {
List<Long> ids = wsDorisOperatorInstanceService.listAll();
ids.forEach(this::doProcess);
log.debug("update doris operator instance status success! update size: {}", ids.size());
listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context));
}

private void doProcess(Long id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.action.ActionStatus;
import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -52,13 +54,14 @@ public FlinkJobStatusSyncJob() {

@Override
protected Runnable doExecute(ActionContext context, ActionListener<ActionResult> listener) {
return () -> process();
return () -> process(context, listener);
}

private void process() {
private void process(ActionContext context, ActionListener<ActionResult> listener) {
List<Long> jobIds = wsFlinkKubernetesJobService.listAll();
jobIds.forEach(this::doProcess);
log.debug("update flink kubernetes job status success! update size: {}", jobIds.size());
listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context));
}

private void doProcess(Long jobId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
package cn.sliew.scaleph.application.flink.action;

import cn.sliew.milky.common.filter.ActionListener;
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.action.ActionStatus;
import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class FlinkJobStatusSyncJobStepOne extends AbstractWorkFlow {
Expand All @@ -35,11 +40,15 @@ public FlinkJobStatusSyncJobStepOne() {

@Override
protected Runnable doExecute(ActionContext context, ActionListener<ActionResult> listener) {
return () -> process();
return () -> process(context, listener);
}

private void process() {
log.info("update flink kubernetes job status step-1");
private void process(ActionContext context, ActionListener<ActionResult> listener) {
log.info("update flink kubernetes job status step-1, globalInputs: {}, inputs: {}",
JacksonUtil.toJsonString(context.getGlobalInputs()), JacksonUtil.toJsonString(context.getInputs()));
Map<String, Object> outputs = context.getOutputs();
outputs.put("output1", "value1");
listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
package cn.sliew.scaleph.application.flink.action;

import cn.sliew.milky.common.filter.ActionListener;
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.action.ActionStatus;
import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class FlinkJobStatusSyncJobStepThreeOne extends AbstractWorkFlow {
Expand All @@ -35,11 +40,15 @@ public FlinkJobStatusSyncJobStepThreeOne() {

@Override
protected Runnable doExecute(ActionContext context, ActionListener<ActionResult> listener) {
return () -> process();
return () -> process(context, listener);
}

private void process() {
log.info("update flink kubernetes job status step-3-1");
private void process(ActionContext context, ActionListener<ActionResult> listener) {
log.info("update flink kubernetes job status step-3-1, globalInputs: {}, inputs: {}",
JacksonUtil.toJsonString(context.getGlobalInputs()), JacksonUtil.toJsonString(context.getInputs()));
Map<String, Object> outputs = context.getOutputs();
outputs.put("output3-1", "value3-1");
listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
package cn.sliew.scaleph.application.flink.action;

import cn.sliew.milky.common.filter.ActionListener;
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.action.ActionStatus;
import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class FlinkJobStatusSyncJobStepThreeTwo extends AbstractWorkFlow {
Expand All @@ -35,11 +40,15 @@ public FlinkJobStatusSyncJobStepThreeTwo() {

@Override
protected Runnable doExecute(ActionContext context, ActionListener<ActionResult> listener) {
return () -> process();
return () -> process(context, listener);
}

private void process() {
log.info("update flink kubernetes job status step-3-2");
private void process(ActionContext context, ActionListener<ActionResult> listener) {
log.info("update flink kubernetes job status step-3-2, globalInputs: {}, inputs: {}",
JacksonUtil.toJsonString(context.getGlobalInputs()), JacksonUtil.toJsonString(context.getInputs()));
Map<String, Object> outputs = context.getOutputs();
outputs.put("output3-2", "value3-2");
listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
package cn.sliew.scaleph.application.flink.action;

import cn.sliew.milky.common.filter.ActionListener;
import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.action.ActionStatus;
import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class FlinkJobStatusSyncJobStepTwo extends AbstractWorkFlow {
Expand All @@ -35,11 +40,15 @@ public FlinkJobStatusSyncJobStepTwo() {

@Override
protected Runnable doExecute(ActionContext context, ActionListener<ActionResult> listener) {
return () -> process();
return () -> process(context, listener);
}

private void process() {
log.info("update flink kubernetes job status step-2");
private void process(ActionContext context, ActionListener<ActionResult> listener) {
log.info("update flink kubernetes job status step-2, globalInputs: {}, inputs: {}",
JacksonUtil.toJsonString(context.getGlobalInputs()), JacksonUtil.toJsonString(context.getInputs()));
Map<String, Object> outputs = context.getOutputs();
outputs.put("output2", "value2");
listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import cn.sliew.scaleph.application.flink.service.WsFlinkKubernetesSessionClusterService;
import cn.sliew.scaleph.workflow.engine.action.ActionContext;
import cn.sliew.scaleph.workflow.engine.action.ActionResult;
import cn.sliew.scaleph.workflow.engine.action.ActionStatus;
import cn.sliew.scaleph.workflow.engine.action.DefaultActionResult;
import cn.sliew.scaleph.workflow.engine.workflow.AbstractWorkFlow;
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -46,13 +48,14 @@ public FlinkSessionClusterStatusSyncJob() {

@Override
protected Runnable doExecute(ActionContext context, ActionListener<ActionResult> listener) {
return () -> process();
return () -> process(context, listener);
}

private void process() {
private void process(ActionContext context, ActionListener<ActionResult> listener) {
List<Long> sessionClusterIds = wsFlinkKubernetesSessionClusterService.listAll();
sessionClusterIds.forEach(this::doProcess);
log.debug("update flink kubernetes session-cluster status success! update size: {}", sessionClusterIds.size());
listener.onResponse(new DefaultActionResult(ActionStatus.SUCCESS, context));
}

private void doProcess(Long sessionClusterId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package cn.sliew.scaleph.application.flink.resource.definition.job.instance;

import cn.sliew.scaleph.application.flink.operator.spec.*;
import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger;
import cn.sliew.scaleph.application.flink.resource.handler.*;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobInstanceDTO;
import cn.sliew.scaleph.common.jackson.JsonMerger;
import org.apache.commons.lang3.EnumUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -104,9 +104,9 @@ private void customFlinkMainContainer(WsFlinkKubernetesJobInstanceDTO jobInstanc
}

private void mergeJobInstance(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) {
spec.setJobManager(TemplateMerger.merge(spec.getJobManager(), jobInstanceDTO.getJobManager(), JobManagerSpec.class));
spec.setTaskManager(TemplateMerger.merge(spec.getTaskManager(), jobInstanceDTO.getTaskManager(), TaskManagerSpec.class));
spec.setFlinkConfiguration(TemplateMerger.merge(spec.getFlinkConfiguration(), jobInstanceDTO.getUserFlinkConfiguration(), Map.class));
spec.setJobManager(JsonMerger.merge(spec.getJobManager(), jobInstanceDTO.getJobManager(), JobManagerSpec.class));
spec.setTaskManager(JsonMerger.merge(spec.getTaskManager(), jobInstanceDTO.getTaskManager(), TaskManagerSpec.class));
spec.setFlinkConfiguration(JsonMerger.merge(spec.getFlinkConfiguration(), jobInstanceDTO.getUserFlinkConfiguration(), Map.class));
JobSpec job = spec.getJob();
if (jobInstanceDTO.getParallelism() != null) {
job.setParallelism(jobInstanceDTO.getParallelism());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionClusterSpec;
import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger;
import cn.sliew.scaleph.common.jackson.JsonMerger;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

Expand All @@ -33,7 +33,7 @@ public class LoggingHandler {

public void handle(Map<String, String> logConfiguration, FlinkDeploymentSpec spec) {
Map<String, String> configuration = Optional.ofNullable(spec.getFlinkConfiguration()).orElse(new HashMap<>());
Map<String, String> merge = TemplateMerger.merge(configuration, logConfiguration, Map.class);
Map<String, String> merge = JsonMerger.merge(configuration, logConfiguration, Map.class);
if (CollectionUtils.isEmpty(merge) == false) {
spec.setLogConfiguration(null);
} else {
Expand All @@ -43,7 +43,7 @@ public void handle(Map<String, String> logConfiguration, FlinkDeploymentSpec spe

public void handle(Map<String, String> logConfiguration, FlinkSessionClusterSpec spec) {
Map<String, String> configuration = Optional.ofNullable(spec.getFlinkConfiguration()).orElse(new HashMap<>());
Map<String, String> merge = TemplateMerger.merge(configuration, logConfiguration, Map.class);
Map<String, String> merge = JsonMerger.merge(configuration, logConfiguration, Map.class);
if (CollectionUtils.isEmpty(merge) == false) {
spec.setLogConfiguration(null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@

import cn.sliew.scaleph.application.flink.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionClusterSpec;
import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesJobDTO;
import cn.sliew.scaleph.application.flink.service.dto.WsFlinkKubernetesSessionClusterDTO;
import cn.sliew.scaleph.common.jackson.JsonMerger;
import io.fabric8.kubernetes.api.model.Pod;
import org.springframework.stereotype.Component;

@Component
public class PodTemplateHandler {

public void handle(WsFlinkKubernetesJobDTO jobDTO, FlinkDeploymentSpec spec) {
Pod merge = TemplateMerger.merge(spec.getPodTemplate(), jobDTO.getFlinkDeployment().getPodTemplate(), Pod.class);
Pod merge = JsonMerger.merge(spec.getPodTemplate(), jobDTO.getFlinkDeployment().getPodTemplate(), Pod.class);
spec.setPodTemplate(merge);
}

public void handle(WsFlinkKubernetesSessionClusterDTO sessionClusterDTO, FlinkSessionClusterSpec spec) {
Pod merge = TemplateMerger.merge(spec.getPodTemplate(), sessionClusterDTO.getPodTemplate(), Pod.class);
Pod merge = JsonMerger.merge(spec.getPodTemplate(), sessionClusterDTO.getPodTemplate(), Pod.class);
spec.setPodTemplate(merge);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import cn.sliew.scaleph.application.flink.operator.spec.FlinkSessionJobSpec;
import cn.sliew.scaleph.application.flink.operator.spec.JobState;
import cn.sliew.scaleph.application.flink.operator.status.*;
import cn.sliew.scaleph.application.flink.operator.util.TemplateMerger;
import cn.sliew.scaleph.application.flink.resource.definition.job.instance.FlinkJobInstanceConverterFactory;
import cn.sliew.scaleph.application.flink.resource.definition.job.instance.MetadataHandler;
import cn.sliew.scaleph.application.flink.service.FlinkKubernetesOperatorService;
Expand All @@ -44,6 +43,7 @@
import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointFormatType;
import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointTriggerType;
import cn.sliew.scaleph.common.jackson.JsonMerger;
import cn.sliew.scaleph.common.util.UUIDUtil;
import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstance;
import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstanceSavepoint;
Expand Down Expand Up @@ -168,7 +168,7 @@ public void deploy(WsFlinkKubernetesJobInstanceDeployParam param) throws Excepti
case FLINK_DEPLOYMENT:
clusterCredentialId = jobDTO.getFlinkDeployment().getClusterCredentialId();
Map<String, String> flinkConfiguration = jobDTO.getFlinkDeployment().getFlinkConfiguration();
Map<String, String> mergedFlinkConfiguration = TemplateMerger.merge(flinkConfiguration, param.getUserFlinkConfiguration(), Map.class);
Map<String, String> mergedFlinkConfiguration = JsonMerger.merge(flinkConfiguration, param.getUserFlinkConfiguration(), Map.class);
record.setMergedFlinkConfiguration(JacksonUtil.toJsonString(mergedFlinkConfiguration));
resource = Constant.FLINK_DEPLOYMENT;
callbackHandler = flinkDeploymentWatchCallbackHandler;
Expand Down
Loading

0 comments on commit 2d5c610

Please sign in to comment.