From 48005cb3d0a38694c28465e62ba65a9091e84bfd Mon Sep 17 00:00:00 2001 From: wangqi Date: Tue, 21 May 2024 13:08:06 +0800 Subject: [PATCH 1/5] fix: kafka params error --- pom.xml | 12 + scaleph-datasource/pom.xml | 4 +- .../scaleph/ds/gravitino/GravitinoConfig.java | 102 ++++---- .../ds/gravitino/GravitinoInitializer.java | 236 +++++++++--------- .../Dag/components/node/steps/helper.ts | 45 ++-- .../docker/mysql/init.d/scaleph-dag-mysql.sql | 2 +- 6 files changed, 199 insertions(+), 202 deletions(-) diff --git a/pom.xml b/pom.xml index 6d76e28d3..3638eb601 100644 --- a/pom.xml +++ b/pom.xml @@ -795,6 +795,18 @@ org.apache.logging.log4j log4j-slf4j2-impl + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-1.2-api + diff --git a/scaleph-datasource/pom.xml b/scaleph-datasource/pom.xml index b543c71fc..10b23a1a5 100644 --- a/scaleph-datasource/pom.xml +++ b/scaleph-datasource/pom.xml @@ -37,14 +37,14 @@ scaleph-resource - + \ No newline at end of file diff --git a/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/gravitino/GravitinoConfig.java b/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/gravitino/GravitinoConfig.java index 90da3c7bd..eb090d1b3 100644 --- a/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/gravitino/GravitinoConfig.java +++ b/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/gravitino/GravitinoConfig.java @@ -1,51 +1,51 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.sliew.scaleph.ds.gravitino; - -import cn.sliew.scaleph.common.util.NetUtils; -import com.datastrato.gravitino.client.GravitinoAdminClient; -import com.datastrato.gravitino.client.GravitinoClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -@EnableConfigurationProperties(GravitinoProperties.class) -public class GravitinoConfig { - - @Autowired - private GravitinoProperties properties; - - @Bean - public GravitinoAdminClient gravitinoAdminClient() { - return GravitinoAdminClient.builder(NetUtils.replaceLocalhost(properties.getUrl())) - .withSimpleAuth() - .build(); - } - - /** - * fixme 必须添加 metalakeName - */ - public GravitinoClient gravitinoClient() { - return GravitinoClient.builder(NetUtils.replaceLocalhost(properties.getUrl())) - .withSimpleAuth() - .build(); - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package cn.sliew.scaleph.ds.gravitino; +// +//import cn.sliew.scaleph.common.util.NetUtils; +//import com.datastrato.gravitino.client.GravitinoAdminClient; +//import com.datastrato.gravitino.client.GravitinoClient; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.boot.context.properties.EnableConfigurationProperties; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +// +//@Configuration +//@EnableConfigurationProperties(GravitinoProperties.class) +//public class GravitinoConfig { +// +// @Autowired +// private GravitinoProperties properties; +// +// @Bean +// public GravitinoAdminClient gravitinoAdminClient() { +// return GravitinoAdminClient.builder(NetUtils.replaceLocalhost(properties.getUrl())) +// .withSimpleAuth() +// .build(); +// } +// +// /** +// * fixme 必须添加 metalakeName +// */ +// public GravitinoClient gravitinoClient() { +// return GravitinoClient.builder(NetUtils.replaceLocalhost(properties.getUrl())) +// .withSimpleAuth() +// .build(); +// } +//} diff --git a/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/gravitino/GravitinoInitializer.java b/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/gravitino/GravitinoInitializer.java index bdc3fa87e..ec50f4662 100644 --- a/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/gravitino/GravitinoInitializer.java +++ b/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/gravitino/GravitinoInitializer.java @@ -1,118 +1,118 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.sliew.scaleph.ds.gravitino; - -import cn.sliew.milky.common.util.JacksonUtil; -import cn.sliew.scaleph.common.dict.job.DataSourceType; -import cn.sliew.scaleph.ds.modal.AbstractDataSource; -import cn.sliew.scaleph.ds.modal.jdbc.JdbcDataSource; -import cn.sliew.scaleph.ds.service.DsInfoService; -import cn.sliew.scaleph.ds.service.dto.DsInfoDTO; -import com.datastrato.gravitino.Catalog; -import com.datastrato.gravitino.NameIdentifier; -import com.datastrato.gravitino.client.GravitinoAdminClient; -import com.datastrato.gravitino.client.GravitinoMetalake; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@Component -public class GravitinoInitializer implements InitializingBean { - - @Autowired - private GravitinoProperties properties; - @Autowired - private GravitinoAdminClient adminClient; - @Autowired - private DsInfoService dsInfoService; - - @Override - public void afterPropertiesSet() throws Exception { - initialize(); - } - - private void initialize() { - NameIdentifier nameIdentifier = NameIdentifier.ofMetalake(properties.getMetalake()); - - // 初始化 metalake - initMetalake(nameIdentifier); - // 初始化 catalog - initDataSource(properties.getMetalake(), DataSourceType.MYSQL); - initDataSource(properties.getMetalake(), DataSourceType.POSTGRESQL); - initDataSource(properties.getMetalake(), DataSourceType.HIVE); - initDataSource(properties.getMetalake(), DataSourceType.ICEBERG); - initDataSource(properties.getMetalake(), DataSourceType.DORIS); - initDataSource(properties.getMetalake(), DataSourceType.KAFKA); - initDataSource(properties.getMetalake(), DataSourceType.HDFS); - } - - private void initMetalake(NameIdentifier nameIdentifier) { - if (adminClient.metalakeExists(nameIdentifier) == false) { - adminClient.createMetalake(nameIdentifier, "scaleph created", Collections.emptyMap()); - } - } - - private void initDataSource(String metalakeName, DataSourceType type) { - List dsInfoDTOS = dsInfoService.listByType(type); - for (DsInfoDTO dsInfoDTO : dsInfoDTOS) { - doInitDataSource(metalakeName, type, dsInfoDTO); - } - } - - private void doInitDataSource(String metalakeName, DataSourceType type, DsInfoDTO dsInfoDTO) { - GravitinoMetalake metalake = adminClient.loadMetalake(NameIdentifier.ofMetalake(metalakeName)); - NameIdentifier catalogName = NameIdentifier.ofCatalog(metalakeName, dsInfoDTO.getName()); - if (metalake.catalogExists(catalogName) == false) { - switch (type) { - case MYSQL: - initMySQL(metalake, catalogName, dsInfoDTO); - break; - case POSTGRESQL: - break; - case HIVE: - break; - case ICEBERG: - break; - case DORIS: - break; - case KAFKA: - break; - case HDFS: - break; - default: - } - } - } - - private void initMySQL(GravitinoMetalake metalake, NameIdentifier catalogName, DsInfoDTO dsInfoDTO) { - JdbcDataSource dataSource = (JdbcDataSource) AbstractDataSource.fromDsInfo((ObjectNode) JacksonUtil.toJsonNode(dsInfoDTO)); - Map properties = new HashMap<>(); - properties.put("jdbc-driver", dataSource.getDriverClassName()); - properties.put("jdbc-url", dataSource.getUrl()); - properties.put("jdbc-user", dataSource.getUser()); - properties.put("jdbc-password", dataSource.getPassword()); - metalake.createCatalog(catalogName, Catalog.Type.RELATIONAL, "jdbc-mysql", dataSource.getRemark(), properties); - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package cn.sliew.scaleph.ds.gravitino; +// +//import cn.sliew.milky.common.util.JacksonUtil; +//import cn.sliew.scaleph.common.dict.job.DataSourceType; +//import cn.sliew.scaleph.ds.modal.AbstractDataSource; +//import cn.sliew.scaleph.ds.modal.jdbc.JdbcDataSource; +//import cn.sliew.scaleph.ds.service.DsInfoService; +//import cn.sliew.scaleph.ds.service.dto.DsInfoDTO; +//import com.datastrato.gravitino.Catalog; +//import com.datastrato.gravitino.NameIdentifier; +//import com.datastrato.gravitino.client.GravitinoAdminClient; +//import com.datastrato.gravitino.client.GravitinoMetalake; +//import com.fasterxml.jackson.databind.node.ObjectNode; +//import org.springframework.beans.factory.InitializingBean; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.stereotype.Component; +// +//import java.util.Collections; +//import java.util.HashMap; +//import java.util.List; +//import java.util.Map; +// +//@Component +//public class GravitinoInitializer implements InitializingBean { +// +// @Autowired +// private GravitinoProperties properties; +// @Autowired +// private GravitinoAdminClient adminClient; +// @Autowired +// private DsInfoService dsInfoService; +// +// @Override +// public void afterPropertiesSet() throws Exception { +// initialize(); +// } +// +// private void initialize() { +// NameIdentifier nameIdentifier = NameIdentifier.ofMetalake(properties.getMetalake()); +// +// // 初始化 metalake +// initMetalake(nameIdentifier); +// // 初始化 catalog +// initDataSource(properties.getMetalake(), DataSourceType.MYSQL); +// initDataSource(properties.getMetalake(), DataSourceType.POSTGRESQL); +// initDataSource(properties.getMetalake(), DataSourceType.HIVE); +// initDataSource(properties.getMetalake(), DataSourceType.ICEBERG); +// initDataSource(properties.getMetalake(), DataSourceType.DORIS); +// initDataSource(properties.getMetalake(), DataSourceType.KAFKA); +// initDataSource(properties.getMetalake(), DataSourceType.HDFS); +// } +// +// private void initMetalake(NameIdentifier nameIdentifier) { +// if (adminClient.metalakeExists(nameIdentifier) == false) { +// adminClient.createMetalake(nameIdentifier, "scaleph created", Collections.emptyMap()); +// } +// } +// +// private void initDataSource(String metalakeName, DataSourceType type) { +// List dsInfoDTOS = dsInfoService.listByType(type); +// for (DsInfoDTO dsInfoDTO : dsInfoDTOS) { +// doInitDataSource(metalakeName, type, dsInfoDTO); +// } +// } +// +// private void doInitDataSource(String metalakeName, DataSourceType type, DsInfoDTO dsInfoDTO) { +// GravitinoMetalake metalake = adminClient.loadMetalake(NameIdentifier.ofMetalake(metalakeName)); +// NameIdentifier catalogName = NameIdentifier.ofCatalog(metalakeName, dsInfoDTO.getName()); +// if (metalake.catalogExists(catalogName) == false) { +// switch (type) { +// case MYSQL: +// initMySQL(metalake, catalogName, dsInfoDTO); +// break; +// case POSTGRESQL: +// break; +// case HIVE: +// break; +// case ICEBERG: +// break; +// case DORIS: +// break; +// case KAFKA: +// break; +// case HDFS: +// break; +// default: +// } +// } +// } +// +// private void initMySQL(GravitinoMetalake metalake, NameIdentifier catalogName, DsInfoDTO dsInfoDTO) { +// JdbcDataSource dataSource = (JdbcDataSource) AbstractDataSource.fromDsInfo((ObjectNode) JacksonUtil.toJsonNode(dsInfoDTO)); +// Map properties = new HashMap<>(); +// properties.put("jdbc-driver", dataSource.getDriverClassName()); +// properties.put("jdbc-url", dataSource.getUrl()); +// properties.put("jdbc-user", dataSource.getUser()); +// properties.put("jdbc-password", dataSource.getPassword()); +// metalake.createCatalog(catalogName, Catalog.Type.RELATIONAL, "jdbc-mysql", dataSource.getRemark(), properties); +// } +//} diff --git a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/helper.ts b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/helper.ts index 3bef807e3..ad0214c48 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/helper.ts +++ b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/helper.ts @@ -6,7 +6,6 @@ import { FilterParams, HttpParams, IcebergParams, - KafkaParams, PulsarParams, RocketMQParams, SchemaParams, @@ -41,7 +40,9 @@ export const StepSchemaService = { values[prefix + CommonListParams.commonList]?.forEach(function (item: Record) { list.push(item[prefix + CommonListParams.commonListItem]) }); - values[param] = JSON.stringify(list) + if (list.length > 0) { + values[param] = JSON.stringify(list) + } return values }, @@ -50,7 +51,9 @@ export const StepSchemaService = { values[prefix + CommonConfigParams.commonConfig]?.forEach(function (item: Record) { configs[item[prefix + CommonConfigParams.commonConfigKey]] = item[prefix + CommonConfigParams.commonConfigValue]; }); - values[param] = JSON.stringify(configs) + if (isRecordNotEmpty(configs)) { + values[param] = JSON.stringify(configs) + } return values }, @@ -81,33 +84,6 @@ export const StepSchemaService = { return values }, - formatKafkaConf: (values: Record) => { - const config: Record = {} - values[KafkaParams.kafkaConf]?.forEach(function (item: Record) { - config[item[KafkaParams.key]] = item[KafkaParams.value]; - }); - values[KafkaParams.kafkaConfig] = JSON.stringify(config) - return values - }, - - formatKafkaPartitionKeyFields: (values: Record) => { - const partitionKeyFields: Array = [] - values.partitionKeyArray?.forEach(function (item: Record) { - partitionKeyFields.push(item.partitionKey) - }); - values[KafkaParams.partitionKeyFields] = JSON.stringify(partitionKeyFields) - return values - }, - - formatAssginPartitions: (values: Record) => { - const assignPartitions: Array = [] - values.assignPartitionArray?.forEach(function (item: Record) { - assignPartitions.push(item.assignPartition) - }); - values[KafkaParams.assignPartitions] = JSON.stringify(assignPartitions) - return values - }, - formatPulsarConf: (values: Record) => { const config: Record = {} values[PulsarParams.pulsarConfigMap]?.forEach(function (item: Record) { @@ -200,3 +176,12 @@ export const StepSchemaService = { }, }; + +function isRecordNotEmpty(record: Record): boolean { + for (const key in record) { + if (record.hasOwnProperty(key)) { + return true; + } + } + return false; +} diff --git a/tools/docker/mysql/init.d/scaleph-dag-mysql.sql b/tools/docker/mysql/init.d/scaleph-dag-mysql.sql index 6314cdee1..f8bf4c002 100644 --- a/tools/docker/mysql/init.d/scaleph-dag-mysql.sql +++ b/tools/docker/mysql/init.d/scaleph-dag-mysql.sql @@ -200,7 +200,7 @@ INSERT INTO `dag_config_step`(`id`, `dag_id`, `step_id`, `step_name`, `position_ `step_meta`, `step_attrs`, `creator`, `editor`) VALUES (19, 9, 'b125d246-a5ae-44cb-8280-ee4f8bfe9f1e', 'Kafka Sink', 370, 250, NULL, NULL, '{\"name\":\"Kafka\",\"type\":\"sink\",\"engine\":\"seatunnel\"}', - '{\"stepTitle\":\"Kafka Sink\",\"dataSourceType\":\"Kafka\",\"dataSource\":7,\"topic\":\"binlog_cdc_sample_data_e_commerce\",\"semantic\":\"AT_LEAST_ONCE\",\"format\":\"debezium_json\",\"schema\":\"{\\\"fields\\\":{}}\",\"kafka.config\":\"{}\",\"partition_key_fields\":\"[]\",\"assign_partitions\":\"[]\"}', + '{\"stepTitle\":\"Kafka Sink\",\"dataSourceType\":\"Kafka\",\"dataSource\":7,\"topic\":\"binlog_cdc_sample_data_e_commerce\",\"semantic\":\"AT_LEAST_ONCE\",\"format\":\"debezium_json\",\"schema\":\"{\\\"fields\\\":{}}\",\"kafka.config\":\"{}\"}', 'sys', 'sys'); INSERT INTO `dag_config_step`(`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`, `step_meta`, `step_attrs`, `creator`, `editor`) From 7f8713ddda31eb9cf1726fa2a5a07117662f324d Mon Sep 17 00:00:00 2001 From: wangqi Date: Tue, 21 May 2024 13:09:47 +0800 Subject: [PATCH 2/5] feature: update elasticsearch docker compose --- tools/docker/deploy/elasticsearch/docker-compose.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tools/docker/deploy/elasticsearch/docker-compose.yml b/tools/docker/deploy/elasticsearch/docker-compose.yml index d86f0db37..19fd52c24 100644 --- a/tools/docker/deploy/elasticsearch/docker-compose.yml +++ b/tools/docker/deploy/elasticsearch/docker-compose.yml @@ -39,4 +39,5 @@ services: networks: scaleph: - driver: bridge \ No newline at end of file + external: true + name: local_scaleph \ No newline at end of file From 1db55fbdbc37666254a35875911695656001685b Mon Sep 17 00:00:00 2001 From: wangqi Date: Tue, 21 May 2024 18:15:33 +0800 Subject: [PATCH 3/5] feature: update doris docker compose --- .../ds/modal/olap/DorisDataSource.java | 1 - .../doris/sink/DorisSinkPlugin.java | 2 ++ .../DataSource/Info/StepForms/Props/Doris.tsx | 1 - tools/docker/deploy/doris/README.md | 29 +++++++++++++++++++ .../deploy/elasticsearch/docker-compose.yml | 4 +-- 5 files changed, 33 insertions(+), 4 deletions(-) create mode 100644 tools/docker/deploy/doris/README.md diff --git a/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/modal/olap/DorisDataSource.java b/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/modal/olap/DorisDataSource.java index 9a24081b9..ca0f49bd7 100644 --- a/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/modal/olap/DorisDataSource.java +++ b/scaleph-datasource/src/main/java/cn/sliew/scaleph/ds/modal/olap/DorisDataSource.java @@ -46,7 +46,6 @@ public class DorisDataSource extends AbstractDataSource { @Schema(description = "username") private String username; - @NotBlank @Schema(description = "password") private String password; diff --git a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/doris/sink/DorisSinkPlugin.java b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/doris/sink/DorisSinkPlugin.java index 2219c97da..10c1df4e7 100644 --- a/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/doris/sink/DorisSinkPlugin.java +++ b/scaleph-plugins/scaleph-plugin-seatunnel-connectors/src/main/java/cn/sliew/scaleph/plugin/seatunnel/flink/connectors/doris/sink/DorisSinkPlugin.java @@ -85,6 +85,8 @@ public ObjectNode createConf() { } if (StringUtils.hasText(dataSource.getPassword())) { conf.putPOJO(PASSWORD.getName(), dataSource.getPassword()); + } else { + conf.putPOJO(PASSWORD.getName(), ""); } if (dataSource.getQueryPort() != null) { conf.putPOJO(QUERY_PORT.getName(), dataSource.getQueryPort()); diff --git a/scaleph-ui-react/src/pages/Metadata/DataSource/Info/StepForms/Props/Doris.tsx b/scaleph-ui-react/src/pages/Metadata/DataSource/Info/StepForms/Props/Doris.tsx index c47b51a43..6ba1fe5fc 100644 --- a/scaleph-ui-react/src/pages/Metadata/DataSource/Info/StepForms/Props/Doris.tsx +++ b/scaleph-ui-react/src/pages/Metadata/DataSource/Info/StepForms/Props/Doris.tsx @@ -75,7 +75,6 @@ const DorisForm: React.FC = () => { name="password" label={intl.formatMessage({id: 'pages.metadata.dataSource.step.props.doris.password'})} colProps={{span: 21, offset: 1}} - rules={[{required: true}]} /> Date: Tue, 21 May 2024 18:29:50 +0800 Subject: [PATCH 4/5] feature: update doris docker compose --- .../Dag/components/node/steps/sink/sink-doris-step.tsx | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/sink/sink-doris-step.tsx b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/sink/sink-doris-step.tsx index 4af7befc8..5e9a36dd9 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/sink/sink-doris-step.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/DataIntegration/SeaTunnel/Dag/components/node/steps/sink/sink-doris-step.tsx @@ -46,6 +46,10 @@ const SinkDorisStepForm: React.FC> = ({data, visible, onVis onFinish={(values) => { if (onOK) { StepSchemaService.formatCommonConfig(values, DorisParams.dorisConfig, DorisParams.dorisConfig); + // doris.config 是必需参数 + if (!values.has(DorisParams.dorisConfig)) { + values[DorisParams.dorisConfig] = JSON.stringify({}) + } onOK(values) return Promise.resolve(true) } From 7b8ba32aa23864b2e340a2b2242a486b2a68de02 Mon Sep 17 00:00:00 2001 From: wangqi Date: Tue, 21 May 2024 20:51:18 +0800 Subject: [PATCH 5/5] feature: update doris and kafka sink --- tools/docker/mysql/init.d/scaleph-dag-mysql.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/docker/mysql/init.d/scaleph-dag-mysql.sql b/tools/docker/mysql/init.d/scaleph-dag-mysql.sql index f8bf4c002..f74642497 100644 --- a/tools/docker/mysql/init.d/scaleph-dag-mysql.sql +++ b/tools/docker/mysql/init.d/scaleph-dag-mysql.sql @@ -224,7 +224,7 @@ INSERT INTO `dag_config_step`(`id`, `dag_id`, `step_id`, `step_name`, `position_ `step_meta`, `step_attrs`, `creator`, `editor`) VALUES (23, 11, '6faa1187-093e-4a1f-867c-ebe5c6ed1251', 'Kafka Sink', 380, 163, NULL, NULL, '{\"name\":\"Kafka\",\"type\":\"sink\",\"engine\":\"seatunnel\"}', - '{\"stepTitle\":\"Kafka Sink\",\"dataSourceType\":\"Kafka\",\"dataSource\":7,\"topic\":\"sample_data_e_commerce\",\"semantic\":\"AT_LEAST_ONCE\",\"format\":\"debezium_json\",\"schema\":\"{\\\"fields\\\":{}}\",\"kafka.config\":\"{}\",\"partition_key_fields\":\"[]\",\"assign_partitions\":\"[]\"}', + '{\"stepTitle\":\"Kafka Sink\",\"dataSourceType\":\"Kafka\",\"dataSource\":7,\"topic\":\"sample_data_e_commerce\",\"semantic\":\"AT_LEAST_ONCE\",\"format\":\"debezium_json\",\"schema\":\"{\\\"fields\\\":{}}\"}', 'sys', 'sys'); INSERT INTO `dag_config_step`(`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`, `step_meta`, `step_attrs`, `creator`, `editor`) @@ -236,7 +236,7 @@ INSERT INTO `dag_config_step`(`id`, `dag_id`, `step_id`, `step_name`, `position_ `step_meta`, `step_attrs`, `creator`, `editor`) VALUES (25, 11, 'ed9c7440-e6b6-47c7-bf97-1051392174eb', 'Doris Sink', 380, 383, NULL, NULL, '{\"name\":\"Doris\",\"type\":\"sink\",\"engine\":\"seatunnel\"}', - '{\"stepTitle\":\"Doris Sink\",\"dataSourceType\":\"Doris\",\"dataSource\":9,\"database\":\"ods\",\"table\":\"ods_data_service_mysql_data_service\",\"sink.label-prefix\":\"scaelph_seatunnel_\",\"sink.enable-2pc\":true,\"sink.enable-delete\":false,\"needs_unsupported_type_casting\":false,\"sink.check-interval\":10000,\"sink.max-retries\":3,\"sink.buffer-size\":262144,\"sink.buffer-count\":3,\"doris.batch.size\":1024,\"schema_save_mode\":\"CREATE_SCHEMA_WHEN_NOT_EXIST\",\"data_save_mode\":\"APPEND_DATA\",\"doris.config\":\"{}\"}', + '{\"stepTitle\":\"Doris Sink\",\"dataSourceType\":\"Doris\",\"dataSource\":9,\"database\":\"ods\",\"table\":\"ods_data_service_mysql_data_service\",\"sink.label-prefix\":\"test_\",\"sink.enable-2pc\":true,\"sink.enable-delete\":false,\"needs_unsupported_type_casting\":false,\"sink.check-interval\":10000,\"sink.max-retries\":3,\"sink.buffer-size\":262144,\"sink.buffer-count\":3,\"doris.batch.size\":1024,\"schema_save_mode\":\"CREATE_SCHEMA_WHEN_NOT_EXIST\",\"data_save_mode\":\"APPEND_DATA\",\"doris.config_common_config_\":[{\"doris.config_common_config_key_\":\"format\",\"doris.config_common_config_value_\":\"json\"},{\"doris.config_common_config_key_\":\"read_json_by_line\",\"doris.config_common_config_value_\":\"true\"}],\"doris.config\":\"{\\\"format\\\":\\\"json\\\",\\\"read_json_by_line\\\":\\\"true\\\"}\"}', 'sys', 'sys'); INSERT INTO `dag_config_step`(`id`, `dag_id`, `step_id`, `step_name`, `position_x`, `position_y`, `shape`, `style`, `step_meta`, `step_attrs`, `creator`, `editor`)