Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
wushengyeyouya committed Nov 29, 2022
2 parents f124e1f + 20b8fed commit 5eadbba
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.webank.wedatasphere.dss.appconn.dolphinscheduler.entity.DolphinSchedulerWorkflow;
import com.webank.wedatasphere.dss.common.entity.node.DSSNode;
import com.webank.wedatasphere.dss.common.exception.DSSRuntimeException;
import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationFailedException;
import com.webank.wedatasphere.dss.workflow.conversion.entity.ConvertedRel;
import com.webank.wedatasphere.dss.workflow.conversion.entity.PreConversionRel;
import com.webank.wedatasphere.dss.workflow.conversion.operation.WorkflowToRelConverter;
Expand All @@ -30,6 +31,12 @@ public class WorkflowToDolphinSchedulerRelConverter implements WorkflowToRelConv
@Override
public ConvertedRel convertToRel(PreConversionRel rel) {
DolphinSchedulerConvertedRel dolphinSchedulerConvertedRel = (DolphinSchedulerConvertedRel)rel;
List<WorkflowNode> workflowNodes = dolphinSchedulerConvertedRel.getWorkflow().getWorkflowNodes();
for (WorkflowNode workflowNode : workflowNodes) {
if ("workflow.subflow".equals(workflowNode.getNodeType())) {
throw new ExternalOperationFailedException(90021, workflowNode.getName() + "当前不支持将subFlow节点发布到DolphinScheduler!");
}
}
Workflow dolphinSchedulerWorkflow = convertWorkflow(dolphinSchedulerConvertedRel);
dolphinSchedulerConvertedRel.setWorkflow(dolphinSchedulerWorkflow);
return dolphinSchedulerConvertedRel;
Expand Down Expand Up @@ -62,13 +69,16 @@ private DolphinSchedulerWorkflow convertWorkflow(DolphinSchedulerConvertedRel do
.collect(Collectors.toMap(WorkflowNode::getName, WorkflowNode::getId));
for (WorkflowNode workflowNode : workflow.getWorkflowNodes()) {
DSSNode node = workflowNode.getDSSNode();
if (node.getLayout() == null) {
continue;
}
DolphinSchedulerTask dolphinSchedulerTask = nodeConverter.convertNode(dolphinSchedulerConvertedRel, node);
processDefinitionJson.addTask(dolphinSchedulerTask);

DolphinSchedulerWorkflow.LocationInfo locationInfo = new DolphinSchedulerWorkflow.LocationInfo();
locationInfo.setName(node.getName());
String targetarr = node.getDependencys().stream().map(nameToIdMap::get).collect(Collectors.joining(","));
locationInfo.setTargetarr(targetarr);
String targetArr = node.getDependencys().stream().map(nameToIdMap::get).collect(Collectors.joining(","));
locationInfo.setTargetarr(targetArr);
locationInfo.setX((int)node.getLayout().getX());
locationInfo.setY((int)node.getLayout().getY());
locations.put(node.getId(), locationInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void syncToRel(ConvertedRel convertedRel) {
DolphinSchedulerConvertedRel dolphinSchedulerConvertedRel = (DolphinSchedulerConvertedRel)convertedRel;
OrchestrationToRelConversionRequestRef requestRef = dolphinSchedulerConvertedRel.getDSSToRelConversionRequestRef();
Workflow workflow = dolphinSchedulerConvertedRel.getWorkflow();
checkSchedulerProject(workflow);
// checkSchedulerProject(workflow);
Long dolphinSchedulerWorkflowId = requestRef.getRefOrchestrationId();
DolphinSchedulerAppConn appConn = (DolphinSchedulerAppConn) dssToRelConversionOperation.getConversionService().getAppStandard().getAppConn();
OrchestrationUpdateOperation updateOperation = appConn.getOrCreateStructureStandard().getOrchestrationService(dssToRelConversionOperation.getConversionService().getAppInstance())
Expand All @@ -47,14 +47,14 @@ public void syncToRel(ConvertedRel convertedRel) {
updateOperation.updateOrchestration(ref);
}

private void checkSchedulerProject(Workflow flow) throws ExternalOperationFailedException {
List<WorkflowNode> nodes = flow.getWorkflowNodes();
for (WorkflowNode node : nodes) {
DSSNode dssNode = node.getDSSNode();
if (CollectionUtils.isEmpty(dssNode.getResources()) && dssNode.getJobContent().isEmpty()) {
throw new ExternalOperationFailedException(90021, dssNode.getName() + "节点内容不能为空");
}
}
}
// private void checkSchedulerProject(Workflow flow) throws ExternalOperationFailedException {
// List<WorkflowNode> nodes = flow.getWorkflowNodes();
// for (WorkflowNode node : nodes) {
// DSSNode dssNode = node.getDSSNode();
// if (CollectionUtils.isEmpty(dssNode.getResources()) && dssNode.getJobContent().isEmpty()) {
// throw new ExternalOperationFailedException(90021, dssNode.getName() + "节点内容不能为空");
// }
// }
// }

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Job build() throws Exception {
LinkisJob job = null;
String jobType = getJobType();
String[] jobTypeSplit = jobType.split("\\.");
if (jobTypeSplit.length < 3) {
if (jobTypeSplit.length < 2) {
throw new LinkisJobExecutionErrorException(90100, "This is not Linkis job type, this jobtype is " + jobType);
}
String engineType = jobTypeSplit[1];
Expand Down

0 comments on commit 5eadbba

Please sign in to comment.