Skip to content

Commit

Permalink
feature: update flink cdc steps
Browse files Browse the repository at this point in the history
  • Loading branch information
kalencaya committed Jul 7, 2024
1 parent 692c102 commit 0917a4a
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Optional;

@Tag(name = "Artifact管理-Flink-CDC")
@RestController
Expand Down Expand Up @@ -118,10 +117,10 @@ public ResponseEntity<ResponseVO> deleteArtifact(@PathVariable("artifactId") Lon
}

@Logging
@GetMapping("{id}/preview")
@PostMapping("preview")
@Operation(summary = "预览 flink cdc 配置", description = "预览 flink cdc 配置")
public ResponseEntity<ResponseVO<String>> previewJob(@PathVariable("id") Long id) throws Exception {
String conf = wsArtifactFlinkCDCService.buildConfig(id, Optional.empty());
public ResponseEntity<ResponseVO<String>> previewJob(@RequestBody WsArtifactFlinkCDCDTO dto) throws Exception {
String conf = wsArtifactFlinkCDCService.buildConfig(dto);
return new ResponseEntity<>(ResponseVO.success(conf), HttpStatus.OK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
@JsonFormat(shape = JsonFormat.Shape.OBJECT)
public enum FlinkCDCPluginName implements DictInstance {

MYSQL("MySQL", "MySQL"),
MYSQL("mysql", "MySQL"),

DORIS("Doris", "Doris"),
STARROCKS("StarRocks", "StarRocks"),
KAFKA("kafka", "Kafka"),
DORIS("doris", "Doris"),
STARROCKS("starrocks", "StarRocks"),
PAIMON("paimon", "Paimon"),

ROUTE("Route", "Route"),
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ public DorisSinkPlugin() {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CommonProperties.NAME);
props.add(CommonProperties.TYPE);
props.add(FENODES);
props.add(BENODES);
props.add(JDBC_URL);
props.add(USERNAME);
props.add(PASSWORD);
props.add(AUTO_REDIRECT);
props.add(SINK_ENABLE_BATCH_MODE);
props.add(SINK_FLUSH_QUEUE_SIZE);
props.add(SINK_BUFFER_FLUSH_MAX_ROWS);
props.add(SINK_BUFFER_FLUSH_MAX_BYTES);
props.add(SINK_BUFFER_FLUSH_INTERVAL);
props.add(SINK_PROPERTIES);
props.add(TABLE_CREATE_PROPERTIES);
// props.add(FENODES);
// props.add(BENODES);
// props.add(JDBC_URL);
// props.add(USERNAME);
// props.add(PASSWORD);
// props.add(AUTO_REDIRECT);
// props.add(SINK_ENABLE_BATCH_MODE);
// props.add(SINK_FLUSH_QUEUE_SIZE);
// props.add(SINK_BUFFER_FLUSH_MAX_ROWS);
// props.add(SINK_BUFFER_FLUSH_MAX_BYTES);
// props.add(SINK_BUFFER_FLUSH_INTERVAL);
// props.add(SINK_PROPERTIES);
// props.add(TABLE_CREATE_PROPERTIES);
this.supportedProperties = Collections.unmodifiableList(props);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public MySQLSourcePlugin() {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CommonProperties.NAME);
props.add(CommonProperties.TYPE);
props.add(HOSTNAME);
props.add(PORT);
props.add(USERNAME);
props.add(PASSWORD);
props.add(TABLES);
// props.add(HOSTNAME);
// props.add(PORT);
// props.add(USERNAME);
// props.add(PASSWORD);
// props.add(TABLES);
props.add(SCHEMA_CHANGE_ENABLED);
props.add(SERVER_ID);
props.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import {WsArtifactFlinkCDC} from "@/services/project/typings";
import {Effect, Reducer} from "umi";
import YAML from "yaml";
import {WsArtifactFlinkCDCService} from "@/services/project/WsArtifactFlinkCDCService";

export interface StateType {
instance: WsArtifactFlinkCDC | null,
instanceYaml: string | null
}

export interface ModelType {
namespace: string;

state: StateType;

effects: {
editInstance: Effect;
};

reducers: {
updateInstance: Reducer<StateType>;
};
}

const model: ModelType = {
namespace: "flinkCDCSteps",

state: {
instance: null,
instanceYaml: null,
},

effects: {
*editInstance({payload}, {call, put}) {
const {data} = yield call(WsArtifactFlinkCDCService.preview, payload);
yield put({
type: 'updateInstance',
payload: {
instance: payload,
instanceYaml: YAML.stringify(data),
}
});
},
},

reducers: {
updateInstance(state, {payload}) {
return {
...state,
instance: payload.instance,
instanceYaml: payload.instanceYaml,
};
},
},
};

export default model;
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import React from "react";
import {ProCard} from "@ant-design/pro-components";
import {useIntl} from "@umijs/max";
import DataIntegrationFlinkCDCStepConfigDataSource
from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/Config/ConfigStepDataSource";
import DataIntegrationFlinkCDCStepConfigRoute
Expand All @@ -9,7 +8,6 @@ import DataIntegrationFlinkCDCStepConfigTransform
from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/Config/ConfigStepTransform";

const DataIntegrationFlinkCDCStepConfig: React.FC = () => {
const intl = useIntl();

return (
<ProCard.Group direction={"column"}>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import React, {useEffect, useRef} from "react";
import {ProCard} from "@ant-design/pro-components";
import {useIntl, connect} from "@umijs/max";
import Editor, {Monaco, useMonaco} from "@monaco-editor/react";

const DataIntegrationFlinkCDCStepYaml: React.FC = (props: any) => {
const intl = useIntl();
const editorRef = useRef(null);
const monaco = useMonaco();

useEffect(() => {
// do conditional chaining
monaco?.languages.typescript.javascriptDefaults.setEagerModelSync(true);
}, [monaco]);

const handleEditorDidMount = (editor, monaco: Monaco) => {
editorRef.current = editor;
}

return (
<ProCard>
<Editor
width="730"
height="600px"
language="yaml"
theme="vs-white"
value={props.flinkCDCSteps.instanceYaml}
options={{
selectOnLineNumbers: true,
readOnly: true,
minimap: {
enabled: false
}
}}
onMount={handleEditorDidMount}
/>
</ProCard>
);
}

const mapModelToProps = ({flinkCDCSteps}: any) => ({flinkCDCSteps})
export default connect(mapModelToProps)(DataIntegrationFlinkCDCStepYaml);
Original file line number Diff line number Diff line change
@@ -1,19 +1,51 @@
import React, {useRef} from 'react';
import {PageContainer, ProCard, ProFormInstance, StepsForm} from "@ant-design/pro-components";
import {useAccess, useIntl, useLocation} from '@umijs/max';
import {WsArtifactFlinkCDC, WsArtifactFlinkCDCAddParam} from "@/services/project/typings";
import {connect, useAccess, useIntl, useLocation} from '@umijs/max';
import {WsArtifact, WsArtifactFlinkCDC, WsArtifactFlinkCDCAddParam,} from "@/services/project/typings";
import {WORKSPACE_CONF} from "@/constants/constant";
import DataIntegrationFlinkCDCStepBase from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/BaseStepForm";
import DataIntegrationFlinkCDCStepConfig
from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/ConfigStepForm";
import {WsArtifactFlinkCDCService} from "@/services/project/WsArtifactFlinkCDCService";
import DataIntegrationFlinkCDCStepYaml from "@/pages/Project/Workspace/DataIntegration/FlinkCDC/Steps/New/YamlStepForm";

const DataIntegrationFlinkCDCNewSteps: React.FC = () => {
const DataIntegrationFlinkCDCNewSteps: React.FC = (props: any) => {
const intl = useIntl();
const access = useAccess();
const formRef = useRef<ProFormInstance>();
const data = useLocation().state as WsArtifactFlinkCDC;
const projectId = localStorage.getItem(WORKSPACE_CONF.projectId);
const localProjectId = localStorage.getItem(WORKSPACE_CONF.projectId);

const onBaseStepFinish = (values: Record<string, any>) => {
const artifact: WsArtifact = {
projectId: localProjectId,
name: values.name,
remark: values.remark
}
const param: WsArtifactFlinkCDC = {
artifact: artifact,
parallelism: values.parallelism,
localTimeZone: values.localTimeZone,
}
editFlinkCDCConfig(param)
return Promise.resolve(true)
}

const onConfigStepFinish = (values: Record<string, any>) => {
try {
const instance: WsArtifactFlinkCDC = WsArtifactFlinkCDCService.formatData(props.flinkCDCSteps.instance, values)
editFlinkCDCConfig(instance)
} catch (unused) {
}
return Promise.resolve(true)
}

const editFlinkCDCConfig = (param: WsArtifactFlinkCDC) => {
props.dispatch({
type: 'flinkCDCSteps/editInstance',
payload: param
})
}

return (
<PageContainer title={false}>
Expand All @@ -26,7 +58,7 @@ const DataIntegrationFlinkCDCNewSteps: React.FC = () => {
}}
onFinish={(values: Record<string, any>) => {
const param: WsArtifactFlinkCDCAddParam = {
projectId: projectId,
projectId: localProjectId,
name: values.name,
parallelism: values.parallelism,
localTimeZone: values.localTimeZone,
Expand All @@ -42,25 +74,28 @@ const DataIntegrationFlinkCDCNewSteps: React.FC = () => {
<StepsForm.StepForm
name="base"
title={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.base'})}
style={{width: 1000}}>
style={{width: 1000}}
onFinish={onBaseStepFinish}>
<DataIntegrationFlinkCDCStepBase/>
</StepsForm.StepForm>
<StepsForm.StepForm
name="config"
title={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.config'})}
style={{width: 1000}}>
style={{width: 1000}}
onFinish={onConfigStepFinish}>
<DataIntegrationFlinkCDCStepConfig/>
</StepsForm.StepForm>
<StepsForm.StepForm
name="yaml"
title={intl.formatMessage({id: 'pages.project.di.flink-cdc.step.yaml'})}
style={{width: 1100}}>

<DataIntegrationFlinkCDCStepYaml/>
</StepsForm.StepForm>
</StepsForm>
</ProCard>
</PageContainer>
);
};

export default DataIntegrationFlinkCDCNewSteps;
const mapModelToProps = ({flinkCDCSteps}: any) => ({flinkCDCSteps})
export default connect(mapModelToProps)(DataIntegrationFlinkCDCNewSteps);
29 changes: 13 additions & 16 deletions scaleph-ui-react/src/services/project/WsArtifactFlinkCDCService.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import {PageResponse, ResponseBody} from '@/typings';
import {request} from '@umijs/max';
import {
KubernetesOptionsVO,
WsArtifactFlinkCDC,
WsArtifactFlinkCDCAddParam,
WsArtifactFlinkCDCGraphUpdateParam,
WsArtifactFlinkCDCHistoryParam,
WsArtifactFlinkCDCParam,
WsArtifactFlinkCDCSelectListParam,
WsArtifactFlinkCDCUpdateParam
WsArtifactFlinkCDCUpdateParam, WsFlinkKubernetesTemplate
} from "@/services/project/typings";

export const WsArtifactFlinkCDCService = {
Expand Down Expand Up @@ -56,9 +56,10 @@ export const WsArtifactFlinkCDCService = {
});
},

preview: async (id: number) => {
return request<ResponseBody<string>>(`${WsArtifactFlinkCDCService.url}/${id}/preview`, {
method: 'GET',
preview: async (param: WsArtifactFlinkCDC) => {
return request<ResponseBody<string>>(`${WsArtifactFlinkCDCService.url}/preview`, {
method: 'POST',
data: param
});
},

Expand All @@ -76,13 +77,6 @@ export const WsArtifactFlinkCDCService = {
});
},

updateGraph: async (param: WsArtifactFlinkCDCGraphUpdateParam) => {
return request<ResponseBody<any>>(`${WsArtifactFlinkCDCService.url}/graph`, {
method: 'POST',
data: param,
});
},

deleteOne: async (row: WsArtifactFlinkCDC) => {
return request<ResponseBody<any>>(`${WsArtifactFlinkCDCService.url}/${row.id}`, {
method: 'DELETE',
Expand All @@ -103,9 +97,12 @@ export const WsArtifactFlinkCDCService = {
});
},

getDnds: async () => {
return request<ResponseBody<Array<Record<string, any>>>>(`${WsArtifactFlinkCDCService.url}/dag/dnd`, {
method: 'GET',
});
formatData: (data: WsArtifactFlinkCDC, value: Record<string, any>) => {
data.fromDsId = value.fromDsId
data.toDsId = value.toDsId
data.transform = value.transform
data.route = value.route
return data;
},

};
6 changes: 0 additions & 6 deletions scaleph-ui-react/src/services/project/typings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,6 @@ export type WsArtifactFlinkCDCUpdateParam = {
remark?: string;
};

export type WsArtifactFlinkCDCGraphUpdateParam = {
id: number;
name?: string;
remark?: string;
};

export type WsArtifactFlinkJar = {
id?: number;
artifact?: WsArtifact;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface WsArtifactFlinkCDCService {

WsArtifactFlinkCDCDTO selectCurrent(Long artifactId);

String buildConfig(Long id, Optional<String> jobName) throws Exception;
String buildConfig(WsArtifactFlinkCDCDTO dto) throws Exception;

WsArtifactFlinkCDCDTO insert(WsArtifactFlinkCDCAddParam param);

Expand Down
Loading

0 comments on commit 0917a4a

Please sign in to comment.