diff --git a/dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/WorkflowToDolphinSchedulerRelConverter.java b/dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/WorkflowToDolphinSchedulerRelConverter.java index 5351256f7d..2a3df0893f 100644 --- a/dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/WorkflowToDolphinSchedulerRelConverter.java +++ b/dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/WorkflowToDolphinSchedulerRelConverter.java @@ -5,6 +5,7 @@ import com.webank.wedatasphere.dss.appconn.dolphinscheduler.entity.DolphinSchedulerWorkflow; import com.webank.wedatasphere.dss.common.entity.node.DSSNode; import com.webank.wedatasphere.dss.common.exception.DSSRuntimeException; +import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationFailedException; import com.webank.wedatasphere.dss.workflow.conversion.entity.ConvertedRel; import com.webank.wedatasphere.dss.workflow.conversion.entity.PreConversionRel; import com.webank.wedatasphere.dss.workflow.conversion.operation.WorkflowToRelConverter; @@ -30,6 +31,12 @@ public class WorkflowToDolphinSchedulerRelConverter implements WorkflowToRelConv @Override public ConvertedRel convertToRel(PreConversionRel rel) { DolphinSchedulerConvertedRel dolphinSchedulerConvertedRel = (DolphinSchedulerConvertedRel)rel; + List workflowNodes = dolphinSchedulerConvertedRel.getWorkflow().getWorkflowNodes(); + for (WorkflowNode workflowNode : workflowNodes) { + if ("workflow.subflow".equals(workflowNode.getNodeType())) { + throw new ExternalOperationFailedException(90021, workflowNode.getName() + "当前不支持将subFlow节点发布到DolphinScheduler!"); + } + } Workflow dolphinSchedulerWorkflow = convertWorkflow(dolphinSchedulerConvertedRel); dolphinSchedulerConvertedRel.setWorkflow(dolphinSchedulerWorkflow); return dolphinSchedulerConvertedRel; @@ -62,13 +69,16 @@ private DolphinSchedulerWorkflow convertWorkflow(DolphinSchedulerConvertedRel do .collect(Collectors.toMap(WorkflowNode::getName, WorkflowNode::getId)); for (WorkflowNode workflowNode : workflow.getWorkflowNodes()) { DSSNode node = workflowNode.getDSSNode(); + if (node.getLayout() == null) { + continue; + } DolphinSchedulerTask dolphinSchedulerTask = nodeConverter.convertNode(dolphinSchedulerConvertedRel, node); processDefinitionJson.addTask(dolphinSchedulerTask); DolphinSchedulerWorkflow.LocationInfo locationInfo = new DolphinSchedulerWorkflow.LocationInfo(); locationInfo.setName(node.getName()); - String targetarr = node.getDependencys().stream().map(nameToIdMap::get).collect(Collectors.joining(",")); - locationInfo.setTargetarr(targetarr); + String targetArr = node.getDependencys().stream().map(nameToIdMap::get).collect(Collectors.joining(",")); + locationInfo.setTargetarr(targetArr); locationInfo.setX((int)node.getLayout().getX()); locationInfo.setY((int)node.getLayout().getY()); locations.put(node.getId(), locationInfo); diff --git a/dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/WorkflowToDolphinSchedulerSynchronizer.java b/dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/WorkflowToDolphinSchedulerSynchronizer.java index 443a28cf1c..ecff05d7fb 100644 --- a/dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/WorkflowToDolphinSchedulerSynchronizer.java +++ b/dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/WorkflowToDolphinSchedulerSynchronizer.java @@ -36,7 +36,7 @@ public void syncToRel(ConvertedRel convertedRel) { DolphinSchedulerConvertedRel dolphinSchedulerConvertedRel = (DolphinSchedulerConvertedRel)convertedRel; OrchestrationToRelConversionRequestRef requestRef = dolphinSchedulerConvertedRel.getDSSToRelConversionRequestRef(); Workflow workflow = dolphinSchedulerConvertedRel.getWorkflow(); - checkSchedulerProject(workflow); +// checkSchedulerProject(workflow); Long dolphinSchedulerWorkflowId = requestRef.getRefOrchestrationId(); DolphinSchedulerAppConn appConn = (DolphinSchedulerAppConn) dssToRelConversionOperation.getConversionService().getAppStandard().getAppConn(); OrchestrationUpdateOperation updateOperation = appConn.getOrCreateStructureStandard().getOrchestrationService(dssToRelConversionOperation.getConversionService().getAppInstance()) @@ -47,14 +47,14 @@ public void syncToRel(ConvertedRel convertedRel) { updateOperation.updateOrchestration(ref); } - private void checkSchedulerProject(Workflow flow) throws ExternalOperationFailedException { - List nodes = flow.getWorkflowNodes(); - for (WorkflowNode node : nodes) { - DSSNode dssNode = node.getDSSNode(); - if (CollectionUtils.isEmpty(dssNode.getResources()) && dssNode.getJobContent().isEmpty()) { - throw new ExternalOperationFailedException(90021, dssNode.getName() + "节点内容不能为空"); - } - } - } +// private void checkSchedulerProject(Workflow flow) throws ExternalOperationFailedException { +// List nodes = flow.getWorkflowNodes(); +// for (WorkflowNode node : nodes) { +// DSSNode dssNode = node.getDSSNode(); +// if (CollectionUtils.isEmpty(dssNode.getResources()) && dssNode.getJobContent().isEmpty()) { +// throw new ExternalOperationFailedException(90021, dssNode.getName() + "节点内容不能为空"); +// } +// } +// } } diff --git a/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/job/Builder.java b/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/job/Builder.java index 384c2bf685..bc36a4a7b7 100644 --- a/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/job/Builder.java +++ b/dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/job/Builder.java @@ -55,7 +55,7 @@ public Job build() throws Exception { LinkisJob job = null; String jobType = getJobType(); String[] jobTypeSplit = jobType.split("\\."); - if (jobTypeSplit.length < 3) { + if (jobTypeSplit.length < 2) { throw new LinkisJobExecutionErrorException(90100, "This is not Linkis job type, this jobtype is " + jobType); } String engineType = jobTypeSplit[1];