Skip to content

Commit

Permalink
feature: update flink cdc steps
Browse files Browse the repository at this point in the history
  • Loading branch information
kalencaya committed Jul 5, 2024
1 parent e0eddb2 commit 692c102
Showing 1 changed file with 41 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import cn.sliew.scaleph.dag.service.dto.DagConfigStepDTO;
import cn.sliew.scaleph.dao.entity.master.ws.WsArtifactFlinkCDC;
import cn.sliew.scaleph.dao.mapper.master.ws.WsArtifactFlinkCDCMapper;
import cn.sliew.scaleph.ds.service.DsInfoService;
import cn.sliew.scaleph.plugin.flink.cdc.FlinkCDCPipilineConnectorPlugin;
import cn.sliew.scaleph.plugin.flink.cdc.pipeline.PipelineProperties;
import cn.sliew.scaleph.plugin.flink.cdc.util.FlinkCDCPluginUtil;
Expand Down Expand Up @@ -80,6 +81,8 @@ public class WsArtifactFlinkCDCServiceImpl implements WsArtifactFlinkCDCService
private FlinkCDCConnectorService flinkCDCConnectorService;
@Autowired
private ResourceService resourceService;
@Autowired
private DsInfoService dsInfoService;

@Override
public Page<WsArtifactFlinkCDCDTO> list(WsArtifactFlinkCDCListParam param) {
Expand Down Expand Up @@ -137,111 +140,57 @@ public WsArtifactFlinkCDCDTO selectCurrent(Long artifactId) {
public String buildConfig(Long id, Optional<String> jobName) throws Exception {
WsArtifactFlinkCDCDTO dto = selectOne(id);
ObjectNode conf = JacksonUtil.createObjectNode();

conf.set("pipeline", buildPipeline(dto));
conf.set("source", buildPipeline(dto));
conf.set("sink", buildPipeline(dto));
conf.set("transform", buildPipeline(dto));
conf.set("route", buildPipeline(dto));
return yamlMapper.writeValueAsString(conf);
}

private void buildEnvs(ObjectNode conf, String jobName, JsonNode dagAttrs) {
conf.set(FlinkCDCConstant.PIPELINE, buildEnv(jobName, dagAttrs));
private ObjectNode buildPipeline(WsArtifactFlinkCDCDTO dto) {
ObjectNode pipeline = JacksonUtil.createObjectNode();
pipeline.put(PipelineProperties.NAME.getName(), dto.getArtifact().getName());
pipeline.put(PipelineProperties.PARALLELISM.getName(), dto.getParallelism());
pipeline.put(PipelineProperties.LOCAL_TIME_ZONE.getName(), dto.getLocalTimeZone());
return pipeline;
}

private ObjectNode buildEnv(String jobName, JsonNode dagAttrs) {
ObjectNode env = JacksonUtil.createObjectNode();
env.put(PipelineProperties.NAME.getName(), jobName);
if (dagAttrs == null || dagAttrs.isEmpty()) {
return env;
}
Iterator<Map.Entry<String, JsonNode>> fields = dagAttrs.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> entry = fields.next();
env.put(entry.getKey(), entry.getValue());
}
return env;
}

private MutableGraph<ObjectNode> buildGraph(DagConfigComplexDTO dag) throws PluginException {
MutableGraph<ObjectNode> graph = GraphBuilder.directed().build();
List<DagConfigStepDTO> steps = dag.getSteps();
List<DagConfigLinkDTO> links = dag.getLinks();
if (CollectionUtils.isEmpty(steps)) {
return graph;
}
Map<String, ObjectNode> stepMap = new HashMap<>();
for (DagConfigStepDTO step : steps) {
Properties properties = mergeJobAttrs(step);
FlinkCDCPluginType stepType = FlinkCDCPluginType.of(step.getStepMeta().get("type").asText());
FlinkCDCPluginName stepName = FlinkCDCPluginName.of(step.getStepMeta().get("name").asText());
FlinkCDCPipilineConnectorPlugin connector = flinkCDCConnectorService.newConnector(FlinkCDCPluginUtil.getIdentity(stepType, stepName), properties);
ObjectNode stepConf = connector.createConf();
stepConf.put(GraphConstants.NODE_ID, step.getId());
stepConf.put(GraphConstants.NODE_TYPE, stepType.getValue());
stepMap.put(step.getStepId(), stepConf);
graph.addNode(stepConf);
private ObjectNode buildSourceOrSink(FlinkCDCPluginType pluginType, WsArtifactFlinkCDCDTO dto) {
ObjectNode connector = JacksonUtil.createObjectNode();
Properties properties = new Properties();
switch (pluginType) {
case SOURCE:
properties = mergeJobAttrs(pluginType, stepName, dto.getFromDsConfig());
break;
case SINK:
properties = mergeJobAttrs(pluginType, stepName, dto.getToDsConfig());
break;
default:
}
links.forEach(link -> graph.putEdge(stepMap.get(link.getFromStepId()), stepMap.get(link.getToStepId())));
return graph;
FlinkCDCPipilineConnectorPlugin connectorPlugin = flinkCDCConnectorService.newConnector(FlinkCDCPluginUtil.getIdentity(pluginType, stepName), properties);
return connectorPlugin.createConf();
}

private void buildNodes(ObjectNode conf, Set<ObjectNode> nodes) {
ArrayNode sourceConf = JacksonUtil.createArrayNode();
ArrayNode sinkConf = JacksonUtil.createArrayNode();
ArrayNode transformConf = JacksonUtil.createArrayNode();
ArrayNode routeConf = JacksonUtil.createArrayNode();

nodes.forEach(node -> {
String nodeType = node.get(GraphConstants.NODE_TYPE).asText();
FlinkCDCPluginType stepType = FlinkCDCPluginType.of(nodeType);
switch (stepType) {
case SOURCE:
sourceConf.add(node);
break;
case SINK:
sinkConf.add(node);
break;
case TRANSFORM:
transformConf.add(node);
break;
case ROUTE:
routeConf.add(node);
break;
default:
private Properties mergeJobAttrs(FlinkCDCPluginType pluginType, FlinkCDCPluginName stepName, JsonNode config) throws PluginException {
Properties properties = PropertyUtil.mapToProperties(JacksonUtil.toObject(config, new TypeReference<Map<String, Object>>() {
}));
FlinkCDCPipilineConnectorPlugin connector = flinkCDCConnectorService.getConnector(pluginType, stepName);
for (ResourceProperty resource : connector.getRequiredResources()) {
String name = resource.getProperty().getName();
if (properties.containsKey(name)) {
Object property = properties.get(name);
// fixme force conform property to resource id
Object value = resourceService.getRaw(resource.getType(), Long.valueOf(property.toString()));
properties.put(name, JacksonUtil.toJsonString(value));
}
});

conf.set(FlinkCDCPluginType.SOURCE.getValue(), sourceConf);
conf.set(FlinkCDCPluginType.SINK.getValue(), sinkConf);
if (transformConf.isEmpty() == false) {
conf.set(FlinkCDCPluginType.TRANSFORM.getValue(), transformConf);
}
if (routeConf.isEmpty() == false) {
conf.set(FlinkCDCPluginType.ROUTE.getValue(), routeConf);
}
return properties;
}

private void buildEdges(Set<EndpointPair<ObjectNode>> edges) {
// edges.forEach(edge -> {
// ObjectNode source = edge.source();
// ObjectNode target = edge.target();
// String pluginName = source.get(SeaTunnelConstant.PLUGIN_NAME).asText().toLowerCase();
// String nodeId = source.get(GraphConstants.NODE_ID).asText();
// source.put(RESULT_TABLE_NAME.getName(), GraphConstants.TABLE_PREFIX + pluginName + "_" + nodeId);
// target.put(SOURCE_TABLE_NAME.getName(), GraphConstants.TABLE_PREFIX + pluginName + "_" + nodeId);
// });
}

private void clearUtiltyField(Set<ObjectNode> nodes) {
nodes.forEach(node -> {
node.remove(GraphConstants.NODE_TYPE);
node.remove(GraphConstants.NODE_ID);
});
}

private Properties mergeJobAttrs(DagConfigStepDTO step) throws PluginException {
Properties properties = PropertyUtil.mapToProperties(JacksonUtil.toObject(step.getStepAttrs(), new TypeReference<Map<String, Object>>() {
private Properties mergeJobAttrs(FlinkCDCPluginType pluginType, FlinkCDCPluginName stepName, JsonNode config) throws PluginException {
Properties properties = PropertyUtil.mapToProperties(JacksonUtil.toObject(config, new TypeReference<Map<String, Object>>() {
}));
FlinkCDCPluginType pluginType = FlinkCDCPluginType.of(step.getStepMeta().get("type").asText());
FlinkCDCPluginName stepName = FlinkCDCPluginName.of(step.getStepMeta().get("name").asText());

FlinkCDCPipilineConnectorPlugin connector = flinkCDCConnectorService.getConnector(pluginType, stepName);
for (ResourceProperty resource : connector.getRequiredResources()) {
String name = resource.getProperty().getName();
Expand Down

0 comments on commit 692c102

Please sign in to comment.