diff --git a/docs/content.zh/docs/connectors/table/kafka.md b/docs/content.zh/docs/connectors/table/kafka.md
index 27c7c9b19..286a922ef 100644
--- a/docs/content.zh/docs/connectors/table/kafka.md
+++ b/docs/content.zh/docs/connectors/table/kafka.md
@@ -81,7 +81,7 @@ CREATE TABLE KafkaTable (
topic |
STRING NOT NULL |
Kafka 记录的 Topic 名。 |
- R |
+ R/W |
partition |
@@ -191,17 +191,17 @@ CREATE TABLE KafkaTable (
topic |
- required for sink |
+ 可选 |
(无) |
String |
- 当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 'topic-1;topic-2' 。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。 |
+ 当表用作 source 时读取数据的 topic 名,或当表用作 sink 时写入的 topic 名。它还支持通过分号分隔的 topic 列表,如 'topic-1;topic-2' 来作为 source 的 topic 列表。注意,“topic-pattern”和“topic”只能指定其中一个。对于 sink 来说,topic 名是写入数据的 topic。它还支持 sink 的 topic 列表。提供的 topic 列表被视为 `topic` 元数据列的有效值的允许列表。如果提供了列表,对于 sink 表,“topic”元数据列是可写的并且必须指定。 |
topic-pattern |
可选 |
(无) |
String |
- 匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。 |
+ 用于读取或写入的 topic 名称模式的正则表达式。所有匹配指定正则表达式的 topic 名称将在作业开始运行时被消费者订阅。对于 sink 来说,`topic` 元数据列是可写的,必须提供并且与 `topic-pattern` 正则表达式匹配。注意,“topic-pattern”和“topic”只能指定其中一个。 |
properties.bootstrap.servers |
diff --git a/docs/content.zh/docs/connectors/table/upsert-kafka.md b/docs/content.zh/docs/connectors/table/upsert-kafka.md
index df2c13878..3d28ae56b 100644
--- a/docs/content.zh/docs/connectors/table/upsert-kafka.md
+++ b/docs/content.zh/docs/connectors/table/upsert-kafka.md
@@ -119,7 +119,7 @@ of all available metadata fields.
必选 |
(none) |
String |
- 用于读取和写入的 Kafka topic 名称。 |
+ 当表用作 source 时读取数据的 topic 名,或当表用作 sink 时写入的 topic 名。它还支持通过分号分隔的 topic 列表,如 'topic-1;topic-2' 来作为 source 的 topic 列表。注意,“topic-pattern”和“topic”只能指定其中一个。对于 sink 来说,topic 名是写入数据的 topic。它还支持 sink 的 topic 列表。提供的 topic 列表被视为 `topic` 元数据列的有效值的允许列表。如果提供了列表,对于 sink 表,“topic”元数据列是可写的并且必须指定。 |
properties.bootstrap.servers |
diff --git a/docs/content/docs/connectors/table/kafka.md b/docs/content/docs/connectors/table/kafka.md
index 55a5dbf27..5756315bc 100644
--- a/docs/content/docs/connectors/table/kafka.md
+++ b/docs/content/docs/connectors/table/kafka.md
@@ -83,7 +83,7 @@ Read-only columns must be declared `VIRTUAL` to exclude them during an `INSERT I
topic |
STRING NOT NULL |
Topic name of the Kafka record. |
- R |
+ R/W |
partition |
@@ -196,11 +196,11 @@ Connector Options
topic |
- required for sink |
+ optional |
yes |
(none) |
String |
- Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2' . Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks. |
+ Topic name(s) to read data from when the table is used as source, or topics for writing when the table is used as sink. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2' . Note, only one of "topic-pattern" and "topic" can be specified. For sinks, the topic name is the topic to write data. It also supports topic list for sinks. The provided topic-list is treated as a allow list of valid values for the `topic` metadata column. If a list is provided, for sink table, 'topic' metadata column is writable and must be specified. |
topic-pattern |
@@ -208,7 +208,7 @@ Connector Options
yes |
(none) |
String |
- The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources. |
+ The regular expression for a pattern of topic names to read from or write to. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. For sinks, the `topic` metadata column is writable, must be provided and match the `topic-pattern` regex. Note, only one of "topic-pattern" and "topic" can be specified. |
properties.bootstrap.servers |
diff --git a/docs/content/docs/connectors/table/upsert-kafka.md b/docs/content/docs/connectors/table/upsert-kafka.md
index eb662349c..e8e38aeda 100644
--- a/docs/content/docs/connectors/table/upsert-kafka.md
+++ b/docs/content/docs/connectors/table/upsert-kafka.md
@@ -129,7 +129,7 @@ Connector Options
required |
(none) |
String |
- The Kafka topic name to read from and write to. |
+ Topic name(s) to read data from when the table is used as source, or topics for writing when the table is used as sink. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2' . Note, only one of "topic-pattern" and "topic" can be specified. For sinks, the topic name is the topic to write data. It also supports topic list for sinks. The provided topic-list is treated as a allow list of valid values for the `topic` metadata column. If a list is provided, for sink table, 'topic' metadata column is writable and must be specified. |
properties.bootstrap.servers |
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
index 7908aded6..71ca41474 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
@@ -30,12 +30,20 @@
import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link KafkaSink}. */
class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema {
- private final String topic;
+ private final Set topics;
+ private final Pattern topicPattern;
private final FlinkKafkaPartitioner partitioner;
@Nullable private final SerializationSchema keySerialization;
private final SerializationSchema valueSerialization;
@@ -44,9 +52,11 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS
private final boolean hasMetadata;
private final int[] metadataPositions;
private final boolean upsertMode;
+ private final Map topicPatternMatches;
DynamicKafkaRecordSerializationSchema(
- String topic,
+ @Nullable List topics,
+ @Nullable Pattern topicPattern,
@Nullable FlinkKafkaPartitioner partitioner,
@Nullable SerializationSchema keySerialization,
SerializationSchema valueSerialization,
@@ -60,7 +70,16 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS
keySerialization != null && keyFieldGetters.length > 0,
"Key must be set in upsert mode for serialization schema.");
}
- this.topic = checkNotNull(topic);
+ Preconditions.checkArgument(
+ (topics != null && topicPattern == null && topics.size() > 0)
+ || (topics == null && topicPattern != null),
+ "Either Topic or Topic Pattern must be set.");
+ if (topics != null) {
+ this.topics = new HashSet<>(topics);
+ } else {
+ this.topics = null;
+ }
+ this.topicPattern = topicPattern;
this.partitioner = partitioner;
this.keySerialization = keySerialization;
this.valueSerialization = checkNotNull(valueSerialization);
@@ -69,6 +88,8 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS
this.hasMetadata = hasMetadata;
this.metadataPositions = metadataPositions;
this.upsertMode = upsertMode;
+ // Cache results of topic pattern matches to avoid re-evaluating the pattern for each record
+ this.topicPatternMatches = new HashMap<>();
}
@Override
@@ -77,13 +98,15 @@ public ProducerRecord serialize(
// shortcut in case no input projection is required
if (keySerialization == null && !hasMetadata) {
final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
+ final String targetTopic = getTargetTopic(consumedRow);
return new ProducerRecord<>(
- topic,
+ targetTopic,
extractPartition(
consumedRow,
+ targetTopic,
null,
valueSerialized,
- context.getPartitionsForTopic(topic)),
+ context.getPartitionsForTopic(targetTopic)),
null,
valueSerialized);
}
@@ -115,14 +138,15 @@ public ProducerRecord serialize(
consumedRow, kind, valueFieldGetters);
valueSerialized = valueSerialization.serialize(valueRow);
}
-
+ final String targetTopic = getTargetTopic(consumedRow);
return new ProducerRecord<>(
- topic,
+ targetTopic,
extractPartition(
consumedRow,
+ targetTopic,
keySerialized,
valueSerialized,
- context.getPartitionsForTopic(topic)),
+ context.getPartitionsForTopic(targetTopic)),
readMetadata(consumedRow, KafkaDynamicSink.WritableMetadata.TIMESTAMP),
keySerialized,
valueSerialized,
@@ -144,14 +168,42 @@ public void open(
valueSerialization.open(context);
}
+ private String getTargetTopic(RowData element) {
+ if (topics != null && topics.size() == 1) {
+ // If topics is a singleton list, we only return the provided topic.
+ return topics.stream().findFirst().get();
+ }
+ final String targetTopic = readMetadata(element, KafkaDynamicSink.WritableMetadata.TOPIC);
+ if (targetTopic == null) {
+ throw new IllegalArgumentException(
+ "The topic of the sink record is not valid. Expected a single topic but no topic is set.");
+ } else if (topics != null && !topics.contains(targetTopic)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The topic of the sink record is not valid. Expected topic to be in: %s but was: %s",
+ topics, targetTopic));
+ } else if (topicPattern != null && !cachedTopicPatternMatch(targetTopic)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The topic of the sink record is not valid. Expected topic to match: %s but was: %s",
+ topicPattern, targetTopic));
+ }
+ return targetTopic;
+ }
+
+ private boolean cachedTopicPatternMatch(String topic) {
+ return topicPatternMatches.computeIfAbsent(topic, t -> topicPattern.matcher(t).matches());
+ }
+
private Integer extractPartition(
RowData consumedRow,
+ String targetTopic,
@Nullable byte[] keySerialized,
byte[] valueSerialized,
int[] partitions) {
if (partitioner != null) {
return partitioner.partition(
- consumedRow, keySerialized, valueSerialized, topic, partitions);
+ consumedRow, keySerialized, valueSerialized, targetTopic, partitions);
}
return null;
}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
index 81ff13c3c..11d3c659f 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
@@ -118,15 +118,15 @@ public class KafkaConnectorOptions {
.asList()
.noDefaultValue()
.withDescription(
- "Topic names from which the table is read. Either 'topic' or 'topic-pattern' must be set for source. "
- + "Option 'topic' is required for sink.");
+ "Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of 'topic-pattern' and 'topic' can be specified for sources. "
+ + "When the table is used as sink, the topic name is the topic to write data. It also supports topic list for sinks. The provided topic-list is treated as a allow list of valid values for the `topic` metadata column. If a list is provided, for sink table, 'topic' metadata column is writable and must be specified.");
public static final ConfigOption TOPIC_PATTERN =
ConfigOptions.key("topic-pattern")
.stringType()
.noDefaultValue()
.withDescription(
- "Optional topic pattern from which the table is read for source. Either 'topic' or 'topic-pattern' must be set.");
+ "Optional topic pattern from which the table is read for source, or topic pattern that must match the provided `topic` metadata column for sink. Either 'topic' or 'topic-pattern' must be set.");
public static final ConfigOption PROPS_BOOTSTRAP_SERVERS =
ConfigOptions.key("properties.bootstrap.servers")
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
index d6390e27a..f752276a3 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
@@ -98,23 +98,22 @@ class KafkaConnectorOptionsUtil {
protected static final String DEBEZIUM_AVRO_CONFLUENT = "debezium-avro-confluent";
private static final List SCHEMA_REGISTRY_FORMATS =
Arrays.asList(AVRO_CONFLUENT, DEBEZIUM_AVRO_CONFLUENT);
-
// --------------------------------------------------------------------------------------------
// Validation
// --------------------------------------------------------------------------------------------
public static void validateTableSourceOptions(ReadableConfig tableOptions) {
- validateSourceTopic(tableOptions);
+ validateTopic(tableOptions);
validateScanStartupMode(tableOptions);
validateScanBoundedMode(tableOptions);
}
public static void validateTableSinkOptions(ReadableConfig tableOptions) {
- validateSinkTopic(tableOptions);
+ validateTopic(tableOptions);
validateSinkPartitioner(tableOptions);
}
- public static void validateSourceTopic(ReadableConfig tableOptions) {
+ public static void validateTopic(ReadableConfig tableOptions) {
Optional> topic = tableOptions.getOptional(TOPIC);
Optional pattern = tableOptions.getOptional(TOPIC_PATTERN);
@@ -128,23 +127,6 @@ public static void validateSourceTopic(ReadableConfig tableOptions) {
}
}
- public static void validateSinkTopic(ReadableConfig tableOptions) {
- String errorMessageTemp =
- "Flink Kafka sink currently only supports single topic, but got %s: %s.";
- if (!isSingleTopic(tableOptions)) {
- if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
- throw new ValidationException(
- String.format(
- errorMessageTemp,
- "'topic-pattern'",
- tableOptions.get(TOPIC_PATTERN)));
- } else {
- throw new ValidationException(
- String.format(errorMessageTemp, "'topic'", tableOptions.get(TOPIC)));
- }
- }
- }
-
private static void validateScanStartupMode(ReadableConfig tableOptions) {
tableOptions
.getOptional(SCAN_STARTUP_MODE)
@@ -254,11 +236,11 @@ private static void validateSinkPartitioner(ReadableConfig tableOptions) {
// Utilities
// --------------------------------------------------------------------------------------------
- public static List getSourceTopics(ReadableConfig tableOptions) {
+ public static List getTopics(ReadableConfig tableOptions) {
return tableOptions.getOptional(TOPIC).orElse(null);
}
- public static Pattern getSourceTopicPattern(ReadableConfig tableOptions) {
+ public static Pattern getTopicPattern(ReadableConfig tableOptions) {
return tableOptions.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null);
}
@@ -636,21 +618,25 @@ public static DynamicTableFactory.Context autoCompleteSchemaRegistrySubject(
private static Map autoCompleteSchemaRegistrySubject(
Map options) {
Configuration configuration = Configuration.fromMap(options);
- // the subject autoComplete should only be used in sink, check the topic first
- validateSinkTopic(configuration);
- final Optional valueFormat = configuration.getOptional(VALUE_FORMAT);
- final Optional keyFormat = configuration.getOptional(KEY_FORMAT);
- final Optional format = configuration.getOptional(FORMAT);
- final String topic = configuration.get(TOPIC).get(0);
-
- if (format.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
- autoCompleteSubject(configuration, format.get(), topic + "-value");
- } else if (valueFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
- autoCompleteSubject(configuration, "value." + valueFormat.get(), topic + "-value");
- }
+ // the subject autoComplete should only be used in sink with a single topic, check the topic
+ // option first
+ validateTopic(configuration);
+ if (configuration.contains(TOPIC) && isSingleTopic(configuration)) {
+ final Optional valueFormat = configuration.getOptional(VALUE_FORMAT);
+ final Optional keyFormat = configuration.getOptional(KEY_FORMAT);
+ final Optional format = configuration.getOptional(FORMAT);
+ final String topic = configuration.get(TOPIC).get(0);
+
+ if (format.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
+ autoCompleteSubject(configuration, format.get(), topic + "-value");
+ } else if (valueFormat.isPresent()
+ && SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
+ autoCompleteSubject(configuration, "value." + valueFormat.get(), topic + "-value");
+ }
- if (keyFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
- autoCompleteSubject(configuration, "key." + keyFormat.get(), topic + "-key");
+ if (keyFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
+ autoCompleteSubject(configuration, "key." + keyFormat.get(), topic + "-key");
+ }
}
return configuration.toMap();
}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index 3f6bc5a27..8ab0f10c6 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -59,6 +59,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
+import java.util.regex.Pattern;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -114,8 +115,11 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
*/
@Nullable private final String transactionalIdPrefix;
- /** The Kafka topic to write to. */
- protected final String topic;
+ /** The Kafka topics to allow for producing. */
+ protected final List topics;
+
+ /** The Kafka topic pattern of topics allowed to produce to. */
+ protected final Pattern topicPattern;
/** Properties for the Kafka producer. */
protected final Properties properties;
@@ -143,7 +147,8 @@ public KafkaDynamicSink(
int[] keyProjection,
int[] valueProjection,
@Nullable String keyPrefix,
- String topic,
+ @Nullable List topics,
+ @Nullable Pattern topicPattern,
Properties properties,
@Nullable FlinkKafkaPartitioner partitioner,
DeliveryGuarantee deliveryGuarantee,
@@ -166,7 +171,8 @@ public KafkaDynamicSink(
// Mutable attributes
this.metadataKeys = Collections.emptyList();
// Kafka-specific attributes
- this.topic = checkNotNull(topic, "Topic must not be null.");
+ this.topics = topics;
+ this.topicPattern = topicPattern;
this.properties = checkNotNull(properties, "Properties must not be null.");
this.partitioner = partitioner;
this.deliveryGuarantee =
@@ -206,7 +212,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.setKafkaProducerConfig(properties)
.setRecordSerializer(
new DynamicKafkaRecordSerializationSchema(
- topic,
+ topics,
+ topicPattern,
partitioner,
keySerialization,
valueSerialization,
@@ -250,8 +257,13 @@ public DataStreamSink> consumeDataStream(
@Override
public Map listWritableMetadata() {
final Map metadataMap = new LinkedHashMap<>();
- Stream.of(WritableMetadata.values())
- .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+ for (WritableMetadata m : WritableMetadata.values()) {
+ if (topics != null && topics.size() == 1 && WritableMetadata.TOPIC.key.equals(m.key)) {
+ // When `topic` is a singleton list, TOPIC metadata is not writable
+ continue;
+ }
+ metadataMap.put(m.key, m.dataType);
+ }
return metadataMap;
}
@@ -272,7 +284,8 @@ public DynamicTableSink copy() {
keyProjection,
valueProjection,
keyPrefix,
- topic,
+ topics,
+ topicPattern,
properties,
partitioner,
deliveryGuarantee,
@@ -306,7 +319,8 @@ public boolean equals(Object o) {
&& Arrays.equals(keyProjection, that.keyProjection)
&& Arrays.equals(valueProjection, that.valueProjection)
&& Objects.equals(keyPrefix, that.keyPrefix)
- && Objects.equals(topic, that.topic)
+ && Objects.equals(topics, that.topics)
+ && Objects.equals(String.valueOf(topicPattern), String.valueOf(that.topicPattern))
&& Objects.equals(properties, that.properties)
&& Objects.equals(partitioner, that.partitioner)
&& Objects.equals(deliveryGuarantee, that.deliveryGuarantee)
@@ -327,7 +341,8 @@ public int hashCode() {
keyProjection,
valueProjection,
keyPrefix,
- topic,
+ topics,
+ topicPattern,
properties,
partitioner,
deliveryGuarantee,
@@ -393,6 +408,20 @@ private RowData.FieldGetter[] getFieldGetters(
// --------------------------------------------------------------------------------------------
enum WritableMetadata {
+ TOPIC(
+ "topic",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return row.getString(pos).toString();
+ }
+ }),
HEADERS(
"headers",
// key and value of the map are nullable to make handling easier in queries
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index 89dda61a1..7c23923b5 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -93,9 +93,9 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getBoundedOptions;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getFlinkKafkaPartitioner;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
-import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
-import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getStartupOptions;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getTopicPattern;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getTopics;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateTableSinkOptions;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateTableSourceOptions;
@@ -222,8 +222,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
keyProjection,
valueProjection,
keyPrefix,
- getSourceTopics(tableOptions),
- getSourceTopicPattern(tableOptions),
+ getTopics(tableOptions),
+ getTopicPattern(tableOptions),
properties,
startupOptions.startupMode,
startupOptions.specificOffsets,
@@ -278,7 +278,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
keyProjection,
valueProjection,
keyPrefix,
- tableOptions.get(TOPIC).get(0),
+ getTopics(tableOptions),
+ getTopicPattern(tableOptions),
getKafkaProperties(context.getCatalogTable().getOptions()),
getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
deliveryGuarantee,
@@ -423,7 +424,8 @@ protected KafkaDynamicSink createKafkaTableSink(
int[] keyProjection,
int[] valueProjection,
@Nullable String keyPrefix,
- String topic,
+ @Nullable List topics,
+ @Nullable Pattern topicPattern,
Properties properties,
FlinkKafkaPartitioner partitioner,
DeliveryGuarantee deliveryGuarantee,
@@ -437,7 +439,8 @@ protected KafkaDynamicSink createKafkaTableSink(
keyProjection,
valueProjection,
keyPrefix,
- topic,
+ topics,
+ topicPattern,
properties,
partitioner,
deliveryGuarantee,
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
index cebe27f2e..78debc175 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -66,6 +66,7 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
@@ -75,8 +76,8 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getBoundedOptions;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
-import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
-import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getTopicPattern;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getTopics;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateScanBoundedMode;
/** Upsert-Kafka factory. */
@@ -94,7 +95,6 @@ public String factoryIdentifier() {
public Set> requiredOptions() {
final Set> options = new HashSet<>();
options.add(PROPS_BOOTSTRAP_SERVERS);
- options.add(TOPIC);
options.add(KEY_FORMAT);
options.add(VALUE_FORMAT);
return options;
@@ -103,6 +103,8 @@ public Set> requiredOptions() {
@Override
public Set> optionalOptions() {
final Set> options = new HashSet<>();
+ options.add(TOPIC);
+ options.add(TOPIC_PATTERN);
options.add(KEY_FIELDS_PREFIX);
options.add(VALUE_FIELDS_INCLUDE);
options.add(SINK_PARALLELISM);
@@ -155,8 +157,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
keyValueProjections.f0,
keyValueProjections.f1,
keyPrefix,
- getSourceTopics(tableOptions),
- getSourceTopicPattern(tableOptions),
+ getTopics(tableOptions),
+ getTopicPattern(tableOptions),
properties,
earliest,
Collections.emptyMap(),
@@ -212,7 +214,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
keyValueProjections.f0,
keyValueProjections.f1,
keyPrefix,
- tableOptions.get(TOPIC).get(0),
+ getTopics(tableOptions),
+ getTopicPattern(tableOptions),
properties,
null,
tableOptions.get(DELIVERY_GUARANTEE),
@@ -247,7 +250,6 @@ private static void validateSource(
Format keyFormat,
Format valueFormat,
int[] primaryKeyIndexes) {
- validateTopic(tableOptions);
validateScanBoundedMode(tableOptions);
validateFormat(keyFormat, valueFormat, tableOptions);
validatePKConstraints(primaryKeyIndexes);
@@ -258,21 +260,11 @@ private static void validateSink(
Format keyFormat,
Format valueFormat,
int[] primaryKeyIndexes) {
- validateTopic(tableOptions);
validateFormat(keyFormat, valueFormat, tableOptions);
validatePKConstraints(primaryKeyIndexes);
validateSinkBufferFlush(tableOptions);
}
- private static void validateTopic(ReadableConfig tableOptions) {
- List topic = tableOptions.get(TOPIC);
- if (topic.size() > 1) {
- throw new ValidationException(
- "The 'upsert-kafka' connector doesn't support topic list now. "
- + "Please use single topic as the value of the parameter 'topic'.");
- }
- }
-
private static void validateFormat(
Format keyFormat, Format valueFormat, ReadableConfig tableOptions) {
if (!keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java
new file mode 100644
index 000000000..6371ae5ef
--- /dev/null
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java
@@ -0,0 +1,148 @@
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DynamicKafkaRecordSerializationSchema}. */
+public class DynamicKafkaRecordSerializationSchemaTest extends TestLogger {
+ private static final List MULTIPLE_TOPICS = Arrays.asList("topic1", "topic2");
+ private static final String SINGLE_TOPIC = "topic";
+ private static final Pattern TOPIC_PATTERN = Pattern.compile("topic*");
+
+ @ParameterizedTest
+ @MethodSource("provideTopicMetadataTestParameters")
+ public void testTopicMetadata(
+ List topics, Pattern topicPattern, String rowTopic, String expectedTopic) {
+ GenericRowData rowData = createRowData(rowTopic);
+ DynamicKafkaRecordSerializationSchema schema = createSchema(topics, topicPattern);
+ KafkaRecordSerializationSchema.KafkaSinkContext context = createContext();
+
+ // Call serialize method
+ ProducerRecord record = schema.serialize(rowData, context, null);
+
+ // Assert the returned ProducerRecord is routed to the correct topic
+ assertThat(record.topic()).isEqualTo(expectedTopic);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideInvalidTopicMetadataTestParameters")
+ public void testInvalidTopicMetadata(
+ List topics, Pattern topicPattern, String rowTopic, String expectedError) {
+ GenericRowData rowData = createRowData(rowTopic);
+ DynamicKafkaRecordSerializationSchema schema = createSchema(topics, topicPattern);
+ KafkaRecordSerializationSchema.KafkaSinkContext context = createContext();
+
+ // Call serialize method
+ assertThatThrownBy(() -> schema.serialize(rowData, context, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(expectedError);
+ }
+
+ private static Stream provideTopicMetadataTestParameters() {
+ String topic1 = "topic1";
+ return Stream.of(
+ Arguments.of(
+ Collections.singletonList(SINGLE_TOPIC), null, SINGLE_TOPIC, SINGLE_TOPIC),
+ Arguments.of(Collections.singletonList(SINGLE_TOPIC), null, topic1, SINGLE_TOPIC),
+ Arguments.of(Collections.singletonList(SINGLE_TOPIC), null, null, SINGLE_TOPIC),
+ Arguments.of(MULTIPLE_TOPICS, null, topic1, topic1),
+ Arguments.of(null, TOPIC_PATTERN, SINGLE_TOPIC, SINGLE_TOPIC));
+ }
+
+ private static Stream provideInvalidTopicMetadataTestParameters() {
+ String other = "other";
+ return Stream.of(
+ Arguments.of(
+ MULTIPLE_TOPICS,
+ null,
+ other,
+ String.format(
+ "The topic of the sink record is not valid. Expected topic to be in: %s but was: %s",
+ MULTIPLE_TOPICS, other)),
+ Arguments.of(
+ null,
+ TOPIC_PATTERN,
+ other,
+ String.format(
+ "The topic of the sink record is not valid. Expected topic to match: %s but was: %s",
+ "topic*", other)));
+ }
+
+ private DynamicKafkaRecordSerializationSchema createSchema(
+ List topics, Pattern topicPattern) {
+ // Create a SerializationSchema for RowData
+ SerializationSchema serializationSchema =
+ new SerializationSchema() {
+ @Override
+ public byte[] serialize(RowData element) {
+ return ((StringData) element.getString(0)).toBytes();
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {}
+ };
+
+ int[] metadataPositions = new int[3];
+ metadataPositions[KafkaDynamicSink.WritableMetadata.TOPIC.ordinal()] = 1;
+ metadataPositions[KafkaDynamicSink.WritableMetadata.HEADERS.ordinal()] = 2;
+ metadataPositions[KafkaDynamicSink.WritableMetadata.TIMESTAMP.ordinal()] = 3;
+
+ return new DynamicKafkaRecordSerializationSchema(
+ topics,
+ topicPattern,
+ null,
+ null,
+ serializationSchema,
+ new RowData.FieldGetter[] {r -> r.getString(0)},
+ new RowData.FieldGetter[] {r -> r.getString(0)},
+ true,
+ metadataPositions,
+ false);
+ }
+
+ private GenericRowData createRowData(String topic) {
+ GenericRowData rowData = new GenericRowData(4);
+ rowData.setField(0, StringData.fromString("test"));
+ rowData.setField(1, StringData.fromString(topic));
+ rowData.setField(2, null);
+ rowData.setField(3, null);
+ return rowData;
+ }
+
+ private KafkaRecordSerializationSchema.KafkaSinkContext createContext() {
+ return new KafkaRecordSerializationSchema.KafkaSinkContext() {
+ @Override
+ public int getParallelInstanceId() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfParallelInstances() {
+ return 1;
+ }
+
+ @Override
+ public int[] getPartitionsForTopic(String topic) {
+ return new int[] {0};
+ }
+ };
+ }
+}
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 1246d53a3..c1d796d08 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -600,7 +600,8 @@ public void testTableSink() {
new int[0],
new int[] {0, 1, 2},
null,
- TOPIC,
+ Collections.singletonList(TOPIC),
+ null,
KAFKA_SINK_PROPERTIES,
new FlinkFixedPartitioner<>(),
DeliveryGuarantee.EXACTLY_ONCE,
@@ -616,6 +617,10 @@ public void testTableSink() {
final SinkV2Provider sinkProvider = (SinkV2Provider) provider;
final Sink sinkFunction = sinkProvider.createSink();
assertThat(sinkFunction).isInstanceOf(KafkaSink.class);
+ assertThat(actualKafkaSink.listWritableMetadata())
+ .containsOnlyKeys(
+ KafkaDynamicSink.WritableMetadata.HEADERS.key,
+ KafkaDynamicSink.WritableMetadata.TIMESTAMP.key);
}
@Test
@@ -640,7 +645,8 @@ public void testTableSinkSemanticTranslation() {
new int[0],
new int[] {0, 1, 2},
null,
- TOPIC,
+ Collections.singletonList(TOPIC),
+ null,
KAFKA_SINK_PROPERTIES,
new FlinkFixedPartitioner<>(),
DeliveryGuarantee.valueOf(semantic.toUpperCase().replace("-", "_")),
@@ -683,7 +689,8 @@ public void testTableSinkWithKeyValue() {
new int[] {0},
new int[] {1, 2},
null,
- TOPIC,
+ Collections.singletonList(TOPIC),
+ null,
KAFKA_FINAL_SINK_PROPERTIES,
new FlinkFixedPartitioner<>(),
DeliveryGuarantee.EXACTLY_ONCE,
@@ -711,7 +718,8 @@ public void testTableSinkWithParallelism() {
new int[0],
new int[] {0, 1, 2},
null,
- TOPIC,
+ Collections.singletonList(TOPIC),
+ null,
KAFKA_SINK_PROPERTIES,
new FlinkFixedPartitioner<>(),
DeliveryGuarantee.EXACTLY_ONCE,
@@ -805,6 +813,77 @@ public void testTableSinkAutoCompleteSchemaRegistrySubject() {
"sub2");
}
+ @Test
+ public void testTableSinkWithTopicList() {
+ final Map modifiedOptions =
+ getModifiedOptions(getBasicSinkOptions(), options -> options.put("topic", TOPICS));
+ KafkaDynamicSink actualSink = (KafkaDynamicSink) createTableSink(SCHEMA, modifiedOptions);
+
+ final EncodingFormat> valueEncodingFormat =
+ new EncodingFormatMock(",");
+
+ final DynamicTableSink expectedSink =
+ createExpectedSink(
+ SCHEMA_DATA_TYPE,
+ null,
+ valueEncodingFormat,
+ new int[0],
+ new int[] {0, 1, 2},
+ null,
+ Arrays.asList(TOPICS.split(";")),
+ null,
+ KAFKA_SINK_PROPERTIES,
+ new FlinkFixedPartitioner<>(),
+ DeliveryGuarantee.EXACTLY_ONCE,
+ null,
+ "kafka-sink");
+ assertThat(actualSink).isEqualTo(expectedSink);
+ final KafkaDynamicSink actualKafkaSink = (KafkaDynamicSink) actualSink;
+ assertThat(actualKafkaSink.listWritableMetadata())
+ .containsOnlyKeys(
+ KafkaDynamicSink.WritableMetadata.TOPIC.key,
+ KafkaDynamicSink.WritableMetadata.HEADERS.key,
+ KafkaDynamicSink.WritableMetadata.TIMESTAMP.key);
+ }
+
+ @Test
+ public void testTableSinkWithTopicPattern() {
+ final Map modifiedOptions =
+ getModifiedOptions(
+ getBasicSinkOptions(),
+ options -> {
+ options.remove("topic");
+ options.put("topic-pattern", TOPIC_REGEX);
+ });
+ KafkaDynamicSink actualSink = (KafkaDynamicSink) createTableSink(SCHEMA, modifiedOptions);
+
+ final EncodingFormat> valueEncodingFormat =
+ new EncodingFormatMock(",");
+
+ final DynamicTableSink expectedSink =
+ createExpectedSink(
+ SCHEMA_DATA_TYPE,
+ null,
+ valueEncodingFormat,
+ new int[0],
+ new int[] {0, 1, 2},
+ null,
+ null,
+ Pattern.compile(TOPIC_REGEX),
+ KAFKA_SINK_PROPERTIES,
+ new FlinkFixedPartitioner<>(),
+ DeliveryGuarantee.EXACTLY_ONCE,
+ null,
+ "kafka-sink");
+ assertThat(actualSink).isEqualTo(expectedSink);
+ final KafkaDynamicSink actualKafkaSink = (KafkaDynamicSink) actualSink;
+ assertThat(actualKafkaSink.listWritableMetadata())
+ .containsOnlyKeys(
+ KafkaDynamicSink.WritableMetadata.TOPIC.key,
+ KafkaDynamicSink.WritableMetadata.HEADERS.key,
+ KafkaDynamicSink.WritableMetadata.TIMESTAMP.key);
+ }
+
private void verifyEncoderSubject(
Consumer