Skip to content

Commit

Permalink
[Feature][scaleph-workspace-seatunnel] upgrade some seatunnel connect…
Browse files Browse the repository at this point in the history
…ors to 2.3.5 version (#715)

* feature: upgrade seatunnel 2.3.5

* feature: upgrade seatunnel 2.3.5

* feature: upgrade seatunnel 2.3.5

* feature: seatunnel iceberg connectors

* feature: add seatunnel 2.3.5 iceberg sink connectors

* feature: update seatunnel 2.3.5 iceberg connectors

* refactor: update seatunnel dag step form common components

* refactor: update seatunnel 2.3.5 cassandra connectors

* refactor: update seatunnel 2.3.5 cdc connectors

* refactor: update seatunnel 2.3.5 cdc connectors

* refactor: update seatunnel 2.3.5 clickhouse connectors

* refactor: update seatunnel 2.3.5 doris connectors

* refactor: update seatunnel 2.3.5 amazon dynamodb connectors

* refactor: update seatunnel 2.3.5 amazon dynamodb connectors

* refactor: update seatunnel 2.3.5 elasticsearch connectors

* refactor: update seatunnel 2.3.5 feishu connectors

* refactor: update seatunnel 2.3.5 feishu connectors

---------

Co-authored-by: wangqi <[email protected]>
  • Loading branch information
kalencaya and wangqi authored May 10, 2024
1 parent a07c8b3 commit 3248f1f
Show file tree
Hide file tree
Showing 85 changed files with 2,332 additions and 2,025 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/release-manual-docker-seatunnel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ on:
seatunnelVersion:
description: 'seatunnel version'
required: true
default: '2.3.4'
default: '2.3.5'
type: choice
options:
- 2.3.4
- 2.3.5
flinkVersion:
description: 'flink version'
required: true
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Scaleph is driven by personal interest and evolves actively through faithful dev
* Data Integration
* Web-ui click-and-drag data integration ways backended by [Apache SeaTunnel](https://seatunnel.apache.org/) on Flink engine.

* Support the latest 2.3.4 V2 out-of-the-box connectors and transforms.
* Support the latest 2.3.5 V2 out-of-the-box connectors and transforms.

* DataSource management.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public enum FlinkImageMapping {
SQL_1_17(FlinkJobType.SQL, OperatorFlinkVersion.v1_17, FlinkVersionMapping.V_1_17, "ghcr.io/flowerfine/scaleph-sql-template:1.17"),
SQL_1_18(FlinkJobType.SQL, OperatorFlinkVersion.v1_18, FlinkVersionMapping.V_1_18, "ghcr.io/flowerfine/scaleph-sql-template:1.18"),

SEATUNNEL_1_15(FlinkJobType.SEATUNNEL, OperatorFlinkVersion.v1_15, FlinkVersionMapping.V_1_15, "ghcr.io/flowerfine/scaleph-seatunnel:2.3.4-flink-1.15"),
SEATUNNEL_1_16(FlinkJobType.SEATUNNEL, OperatorFlinkVersion.v1_16, FlinkVersionMapping.V_1_16, "ghcr.io/flowerfine/scaleph-seatunnel:2.3.4-flink-1.16"),
SEATUNNEL_1_16(FlinkJobType.SEATUNNEL, OperatorFlinkVersion.v1_16, FlinkVersionMapping.V_1_16, "ghcr.io/flowerfine/scaleph-seatunnel:2.3.5-flink-1.16"),
FLINK_CDC_1_18(FlinkJobType.FLINK_CDC, OperatorFlinkVersion.v1_18, FlinkVersionMapping.V_1_18, "ghcr.io/flowerfine/scaleph-flink-cdc:3.0.0-flink-1.18"),
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public enum SeaTunnelPluginMapping {
SINK_REDIS(SEATUNNEL, SINK, REDIS, "connector-redis", BETA),
SOURCE_ELASTICSEARCH(SEATUNNEL, SOURCE, ELASTICSEARCH, "connector-elasticsearch", UNKNOWN, BATCH, SCHEMA_PROJECTION),
SINK_ELASTICSEARCH(SEATUNNEL, SINK, ELASTICSEARCH, "connector-elasticsearch", GA, CDC),
SOURCE_EASYSEARCH(SEATUNNEL, SOURCE, EASYSEARCH, "connector-easysearch", UNKNOWN, BATCH, SCHEMA_PROJECTION),
SINK_EASYSEARCH(SEATUNNEL, SINK, EASYSEARCH, "connector-easysearch", UNKNOWN, CDC),
SOURCE_MONGODB(SEATUNNEL, SOURCE, MONGODB, "connector-mongodb", BETA, BATCH, EXACTLY_ONCE, SCHEMA_PROJECTION, PARALLELISM, SUPPORT_USER_DEFINED_SPLIT),
SINK_MONGODB(SEATUNNEL, SINK, MONGODB, "connector-mongodb", BETA, EXACTLY_ONCE, CDC),
SOURCE_AMAZON_DYNAMODB(SEATUNNEL, SOURCE, AMAZON_DYNAMODB, "connector-amazondynamodb", BETA, BATCH, SCHEMA_PROJECTION),
Expand Down Expand Up @@ -120,6 +122,7 @@ public enum SeaTunnelPluginMapping {
SINK_STARROCKS(SEATUNNEL, SINK, STARROCKS, "connector-starrocks", ALPHA),
SOURCE_HUDI(SEATUNNEL, SOURCE, HUDI, "connector-hudi", BETA, BATCH, EXACTLY_ONCE, PARALLELISM),
SOURCE_ICEBERG(SEATUNNEL, SOURCE, ICEBERG, "connector-iceberg", BETA, BATCH, STREAM, EXACTLY_ONCE, SCHEMA_PROJECTION, PARALLELISM),
SINK_ICEBERG(SEATUNNEL, SINK, ICEBERG, "connector-iceberg", UNKNOWN, CDC),
SOURCE_PAIMON(SEATUNNEL, SOURCE, PAIMON, "connector-paimon", UNKNOWN, BATCH),
SINK_PAIMON(SEATUNNEL, SINK, PAIMON, "connector-paimon", UNKNOWN, EXACTLY_ONCE),
SINK_S3REDSHIFT(SEATUNNEL, SINK, S3REDSHIFT, "connector-s3-redshift", GA, EXACTLY_ONCE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public enum SeaTunnelPluginName implements DictInstance {
JDBC("Jdbc", "Jdbc"),
REDIS("Redis", "Redis"),
ELASTICSEARCH("Elasticsearch", "Elasticsearch"),
EASYSEARCH("Easysearch", "Easysearch"),
MONGODB("MongoDB", "MongoDB"),
AMAZON_DYNAMODB("AmazonDynamodb", "AmazonDynamodb"),
CASSANDRA("Cassandra", "Cassandra"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum SeaTunnelVersion implements DictInstance {

V_2_3_3("2.3.3", "2.3.3"),
V_2_3_4("2.3.4", "2.3.4"),
V_2_3_5("2.3.5", "2.3.5"),
;

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.util.StringUtils;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -49,7 +50,7 @@ public class DorisDataSource extends AbstractDataSource {
@Schema(description = "password")
private String password;

@NotBlank
@NotNull
@Schema(description = "fenodes query port")
private Integer queryPort;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public enum CDCSourceProperties {
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> TABLE_CONFIG = new PropertyDescriptor.Builder()
.name("table-names")
.description("Table name of the database to monitor.")
.name("table-names-config")
.description("Table config list.")
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
Expand All @@ -86,6 +86,9 @@ public enum CDCSourceProperties {
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

/**
* fixme support timestamp?
*/
public static final PropertyDescriptor<Long> STARTUP_TIMESTAMP = new PropertyDescriptor.Builder()
.name("startup.timestamp")
.description("Start from the specified epoch timestamp (in milliseconds).")
Expand Down Expand Up @@ -143,6 +146,9 @@ public enum CDCSourceProperties {
.addValidator(Validators.POSITIVE_LONG_VALIDATOR)
.validateAndBuild();

/**
* fixme does all jdbc cdc support this?
*/
public static final PropertyDescriptor<Integer> INCREMENTAL_PARALLELISM = new PropertyDescriptor.Builder()
.name("incremental.parallelism")
.description("The number of parallel readers in the incremental phase.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.cdc.mongodb.source;

import cn.sliew.scaleph.plugin.framework.property.*;
import com.fasterxml.jackson.databind.JsonNode;

import java.util.List;

public enum MongoDBCDCSourceProperties {
;
Expand Down Expand Up @@ -49,7 +50,7 @@ public enum MongoDBCDCSourceProperties {
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> DATABASE = new PropertyDescriptor.Builder()
public static final PropertyDescriptor<List<String>> DATABASE = new PropertyDescriptor.Builder()
.name("database")
.description("Name of the database to watch for changes")
.type(PropertyType.OBJECT)
Expand All @@ -58,7 +59,7 @@ public enum MongoDBCDCSourceProperties {
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> COLLECTION = new PropertyDescriptor.Builder()
public static final PropertyDescriptor<List<String>> COLLECTION = new PropertyDescriptor.Builder()
.name("collection")
.description("Name of the collection in the database to watch for changes")
.type(PropertyType.OBJECT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor;
import cn.sliew.scaleph.plugin.framework.property.PropertyType;
import cn.sliew.scaleph.plugin.framework.property.Validators;
import com.fasterxml.jackson.databind.JsonNode;

public enum DynamoDBSinkProperties {
;
Expand All @@ -35,12 +34,15 @@ public enum DynamoDBSinkProperties {
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

/**
* fixme seatunnel support this?
*/
public static final PropertyDescriptor<Integer> BATCH_INTERVAL_MS = new PropertyDescriptor.Builder()
.name("batch_interval_ms")
.description("The batch interval of Amazon DynamoDB")
.type(PropertyType.INT)
.parser(Parsers.INTEGER_PARSER)
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public enum ElasticsearchSourceProperties {
.defaultValue(100)
.validateAndBuild();

/**
* fixme support schema?
*/
public static final PropertyDescriptor<JsonNode> SCHEMA = new PropertyDescriptor.Builder()
.name("schema")
.description("The structure of the data, including field names and field types")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public enum FakeProperties {
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> ROWS = new PropertyDescriptor.Builder<JsonNode>()
.name("rows")
.description(
"The row list of fake data output per degree of parallelism")
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<Integer> SPLIT_NUM = new PropertyDescriptor.Builder<Integer>()
.name("split.num")
.description("the number of splits generated by the enumerator for each degree of parallelism")
Expand Down Expand Up @@ -106,15 +115,6 @@ public enum FakeProperties {
.addValidator(Validators.POSITIVE_INTEGER_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> ROWS = new PropertyDescriptor.Builder<JsonNode>()
.name("rows")
.description(
"The row list of fake data output per degree of parallelism")
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> STRING_FAKE_MODE = new PropertyDescriptor.Builder()
.name("string.fake.mode")
.description("The fake mode of generating string data")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ public FakeSourcePlugin() {
props.add(TABLES_CONFIGS);
props.add(SCHEMA);
props.add(ROW_NUM);
props.add(ROWS);
props.add(SPLIT_NUM);
props.add(SPLIT_READ_INTERVAL);
props.add(MAP_SIZE);
props.add(ARRAY_SIZE);
props.add(BYTES_SIZE);
props.add(STRING_SIZE);
props.add(ROWS);

props.add(STRING_FAKE_MODE);
props.add(STRING_TEMPLATE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.iceberg;

import cn.sliew.scaleph.plugin.framework.property.*;
import com.fasterxml.jackson.databind.JsonNode;

public enum IcebergProperties {
;

public static final PropertyDescriptor<String> CATALOG_NAME = new PropertyDescriptor.Builder<String>()
.name("catalog_name")
.description("catalog name")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.properties(Property.Required)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> NAMESPACE = new PropertyDescriptor.Builder<String>()
.name("namespace")
.description("database name in the backend catalog")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.properties(Property.Required)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<String> TABLE = new PropertyDescriptor.Builder<String>()
.name("table")
.description("iceberg table name in the backend catalog")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.properties(Property.Required)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> ICEBERG_CATALOG_CONFIG = new PropertyDescriptor.Builder()
.name("iceberg.catalog.config")
.description("Specify the properties for initializing the Iceberg catalog.")
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.properties(Property.Required)
.validateAndBuild();

public static final PropertyDescriptor<String> ICEBERG_HADOOP_CONF_PATH = new PropertyDescriptor.Builder()
.name("iceberg.hadoop-conf-path")
.description("The specified loading paths for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files.")
.type(PropertyType.STRING)
.parser(Parsers.STRING_PARSER)
.addValidator(Validators.NON_BLANK_VALIDATOR)
.validateAndBuild();

public static final PropertyDescriptor<JsonNode> HADOOP_CONFIG = new PropertyDescriptor.Builder()
.name("hadoop.config")
.description("Properties passed through to the Hadoop configuration.")
.type(PropertyType.OBJECT)
.parser(Parsers.JSON_PARSER)
.validateAndBuild();

public static final PropertyDescriptor<Boolean> CASE_SENSITIVE = new PropertyDescriptor.Builder<Boolean>()
.name("case_sensitive")
.description("If data columns where selected via fields(Collection), controls whether the match to the schema will be done with case sensitivity.")
.type(PropertyType.BOOLEAN)
.parser(Parsers.BOOLEAN_PARSER)
.addValidator(Validators.BOOLEAN_VALIDATOR)
.validateAndBuild();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.plugin.seatunnel.flink.connectors.iceberg.sink;

import cn.sliew.scaleph.common.dict.seatunnel.SeaTunnelPluginMapping;
import cn.sliew.scaleph.plugin.framework.core.PluginInfo;
import cn.sliew.scaleph.plugin.framework.property.PropertyDescriptor;
import cn.sliew.scaleph.plugin.seatunnel.flink.SeaTunnelConnectorPlugin;
import cn.sliew.scaleph.plugin.seatunnel.flink.env.CommonProperties;
import com.google.auto.service.AutoService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static cn.sliew.scaleph.plugin.seatunnel.flink.connectors.SaveModeProperties.DATA_SAVE_MODE;
import static cn.sliew.scaleph.plugin.seatunnel.flink.connectors.SaveModeProperties.SCHEMA_SAVE_MODE;
import static cn.sliew.scaleph.plugin.seatunnel.flink.connectors.iceberg.IcebergProperties.*;
import static cn.sliew.scaleph.plugin.seatunnel.flink.connectors.iceberg.sink.IcebergSinkProperties.*;

@AutoService(SeaTunnelConnectorPlugin.class)
public class IcebergSinkPlugin extends SeaTunnelConnectorPlugin {

public IcebergSinkPlugin() {
this.pluginInfo = new PluginInfo(getIdentity(),
"Apache Iceberg is an open table format for huge analytic datasets",
IcebergSinkPlugin.class.getName());

final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CATALOG_NAME);
props.add(NAMESPACE);
props.add(TABLE);
props.add(ICEBERG_CATALOG_CONFIG);
props.add(ICEBERG_HADOOP_CONF_PATH);
props.add(HADOOP_CONFIG);
props.add(CASE_SENSITIVE);
props.add(ICEBERG_TABEL_WRITE_PROPS);
props.add(ICEBERG_TABEL_AUTO_CREATE_PROPS);
props.add(ICEBERG_TABEL_PRIMARY_KEYS);
props.add(ICEBERG_TABEL_PARTITION_KEYS);
props.add(ICEBERG_TABEL_SCHEMA_EVOLUTION_ENABLED);
props.add(ICEBERG_TABEL_UPSERT_MODE_ENABLED);
props.add(ICEBERG_TABEL_COMMIT_BRANCH);
props.add(SCHEMA_SAVE_MODE);
props.add(DATA_SAVE_MODE);
props.add(CommonProperties.PARALLELISM);
props.add(CommonProperties.RESULT_TABLE_NAME);
supportedProperties = Collections.unmodifiableList(props);
}

@Override
protected SeaTunnelPluginMapping getPluginMapping() {
return SeaTunnelPluginMapping.SINK_ICEBERG;
}
}
Loading

0 comments on commit 3248f1f

Please sign in to comment.