Skip to content

Commit

Permalink
[FLINK-35808] Let `ConsumerConfig.(KEY|VALUE)_DESERIALIZER_CLASS_CONF…
Browse files Browse the repository at this point in the history
…IG` be overridable by user in `KafkaSourceBuilder` (#108)

## What is the purpose of the change

Let `ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG` and `ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG` be overridable by user in `KafkaSourceBuilder`, in order to enable the Large Message use-case discussed in this [mailing list discussion](https://lists.apache.org/thread/spl88o63sjm2dv4l5no0ym632d2yt2o6). 

This allows users to easily implement the [`claim check` large message pattern](https://developer.confluent.io/patterns/event-processing/claim-check/) without bringing any concerns into the Flink codebase otherwise, by specifying a `value.deserializer` that handles it, but otherwise passes through the bytes.

Note: [overriding `value.serializer` is already supported on the Producer side. ](https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83)

Other Reading:
https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/
https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0

## Brief change log
- Updates key and value deserializers to be overridable by users in `KafkaSourceBuilder`

## Verifying this change
- [x] Test that both key and value  deserializers can be overridden
- [x] Tests to ensure that the user-supplied deserializer(s) returns bytes (byte[])

## Does this pull request potentially affect one of the following parts:

- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
- The serializers: yes
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: no
- The S3 file system connector: no

## Documentation

- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
  • Loading branch information
klam-shop authored Jul 18, 2024
1 parent 4429b78 commit 86f796a
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 17 deletions.
2 changes: 0 additions & 2 deletions docs/content.zh/docs/connectors/datastream/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,6 @@ Kafka Source 支持流式和批式两种运行模式。默认情况下,KafkaSo
Kafka consumer 的配置可以参考 [Apache Kafka 文档](http://kafka.apache.org/documentation/#consumerconfigs)

请注意,即使指定了以下配置项,构建器也会将其覆盖:
- ```key.deserializer``` 始终设置为 ByteArrayDeserializer
- ```value.deserializer``` 始终设置为 ByteArrayDeserializer
- ```auto.offset.reset.strategy``` 被 OffsetsInitializer#getAutoOffsetResetStrategy() 覆盖
- ```partition.discovery.interval.ms``` 会在批模式下被覆盖为 -1

Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/connectors/table/upsert-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ of all available metadata fields.
<td>
该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 <a href="https://kafka.apache.org/documentation/#configuration">Kafka 参数文档</a>中的参数名。
Flink 会自动移除 选项名中的 "properties." 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 <code>'properties.allow.auto.create.topics' = 'false'</code>
来禁止自动创建 topic。 但是,某些选项,例如<code>'key.deserializer'</code> 和 <code>'value.deserializer'</code> 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
来禁止自动创建 topic。 但是,某些选项,例如<code>'auto.offset.reset'</code> 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
</td>
</tr>
<tr>
Expand Down
2 changes: 0 additions & 2 deletions docs/content/docs/connectors/datastream/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,6 @@ for more details.

Please note that the following keys will be overridden by the builder even if
it is configured:
- ```key.deserializer``` is always set to ```ByteArrayDeserializer```
- ```value.deserializer``` is always set to ```ByteArrayDeserializer```
- ```auto.offset.reset.strategy``` is overridden by ```OffsetsInitializer#getAutoOffsetResetStrategy()```
for the starting offsets
- ```partition.discovery.interval.ms``` is overridden to -1 when
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/table/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in <a href="https://kafka.apache.org/documentation/#configuration">Kafka Configuration documentation</a>. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via <code>'properties.allow.auto.create.topics' = 'false'</code>. But there are some configurations that do not support to set, because Flink will override them, e.g. <code>'key.deserializer'</code> and <code>'value.deserializer'</code>.
This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in <a href="https://kafka.apache.org/documentation/#configuration">Kafka Configuration documentation</a>. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via <code>'properties.allow.auto.create.topics' = 'false'</code>. But there are some configurations that do not support to set, because Flink will override them, e.g. <code>'auto.offset.reset'</code>.
</td>
</tr>
<tr>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/table/upsert-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in <a href="https://kafka.apache.org/documentation/#configuration">Kafka Configuration documentation</a>. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via <code>'properties.allow.auto.create.topics' = 'false'</code>. But there are some configurations that do not support to set, because Flink will override them, e.g. <code>'key.deserializer'</code> and <code>'value.deserializer'</code>.
This can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in <a href="https://kafka.apache.org/documentation/#configuration">Kafka Configuration documentation</a>. Flink will remove the "properties." key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via <code>'properties.allow.auto.create.topics' = 'false'</code>. But there are some configurations that do not support to set, because Flink will override them, e.g. <code>'auto.offset.reset'</code>.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -379,8 +382,6 @@ public KafkaSourceBuilder<OUT> setRackIdSupplier(SerializableSupplier<String> ra
* created.
*
* <ul>
* <li><code>key.deserializer</code> is always set to {@link ByteArrayDeserializer}.
* <li><code>value.deserializer</code> is always set to {@link ByteArrayDeserializer}.
* <li><code>auto.offset.reset.strategy</code> is overridden by {@link
* OffsetsInitializer#getAutoOffsetResetStrategy()} for the starting offsets, which is by
* default {@link OffsetsInitializer#earliest()}.
Expand All @@ -405,8 +406,6 @@ public KafkaSourceBuilder<OUT> setProperty(String key, String value) {
* created.
*
* <ul>
* <li><code>key.deserializer</code> is always set to {@link ByteArrayDeserializer}.
* <li><code>value.deserializer</code> is always set to {@link ByteArrayDeserializer}.
* <li><code>auto.offset.reset.strategy</code> is overridden by {@link
* OffsetsInitializer#getAutoOffsetResetStrategy()} for the starting offsets, which is by
* default {@link OffsetsInitializer#earliest()}.
Expand Down Expand Up @@ -457,11 +456,11 @@ private void parseAndSetRequiredProperties() {
maybeOverride(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
true);
false);
maybeOverride(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
true);
false);
if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
LOG.warn(
"Offset commit on checkpoint is disabled because {} is not specified",
Expand Down Expand Up @@ -534,6 +533,47 @@ private void sanityCheck() {
if (stoppingOffsetsInitializer instanceof OffsetsInitializerValidator) {
((OffsetsInitializerValidator) stoppingOffsetsInitializer).validate(props);
}
if (props.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
checkDeserializer(props.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
}
if (props.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
checkDeserializer(props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
}
}

private void checkDeserializer(String deserializer) {
try {
Class<?> deserClass = Class.forName(deserializer);
if (!Deserializer.class.isAssignableFrom(deserClass)) {
throw new IllegalArgumentException(
String.format(
"Deserializer class %s is not a subclass of %s",
deserializer, Deserializer.class.getName()));
}

// Get the generic type information
Type[] interfaces = deserClass.getGenericInterfaces();
for (Type iface : interfaces) {
if (iface instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) iface;
Type rawType = parameterizedType.getRawType();

// Check if it's Deserializer<byte[]>
if (rawType == Deserializer.class) {
Type[] typeArguments = parameterizedType.getActualTypeArguments();
if (typeArguments.length != 1 || typeArguments[0] != byte[].class) {
throw new IllegalArgumentException(
String.format(
"Deserializer class %s does not deserialize byte[]",
deserializer));
}
}
}
}
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Deserializer class %s not found", deserializer), e);
}
}

private boolean offsetCommitEnabledManually() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,20 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -191,6 +196,27 @@ public void testSettingCustomKafkaSubscriber() {
"Cannot use partitions for consumption because a ExampleCustomSubscriber is already set for consumption.");
}

@ParameterizedTest
@MethodSource("provideSettingCustomDeserializerTestParameters")
public void testSettingCustomDeserializer(String propertyKey, String propertyValue) {
final KafkaSource<String> kafkaSource =
getBasicBuilder().setProperty(propertyKey, propertyValue).build();
assertThat(
kafkaSource
.getConfiguration()
.get(ConfigOptions.key(propertyKey).stringType().noDefaultValue()))
.isEqualTo(propertyValue);
}

@ParameterizedTest
@MethodSource("provideInvalidCustomDeserializersTestParameters")
public void testSettingInvalidCustomDeserializers(
String propertyKey, String propertyValue, String expectedError) {
assertThatThrownBy(() -> getBasicBuilder().setProperty(propertyKey, propertyValue).build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(expectedError);
}

private KafkaSourceBuilder<String> getBasicBuilder() {
return new KafkaSourceBuilder<String>()
.setBootstrapServers("testServer")
Expand All @@ -206,4 +232,43 @@ public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient)
return Collections.singleton(new TopicPartition("topic", 0));
}
}

private static Stream<Arguments> provideSettingCustomDeserializerTestParameters() {
return Stream.of(
Arguments.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
TestByteArrayDeserializer.class.getName()),
Arguments.of(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
TestByteArrayDeserializer.class.getName()));
}

private static Stream<Arguments> provideInvalidCustomDeserializersTestParameters() {
String deserOne = String.class.getName();
String deserTwo = "NoneExistentClass";
String deserThree = StringDeserializer.class.getName();
return Stream.of(
Arguments.of(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
deserOne,
String.format(
"Deserializer class %s is not a subclass of org.apache.kafka.common.serialization.Deserializer",
deserOne)),
Arguments.of(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
deserTwo,
String.format("Deserializer class %s not found", deserTwo)),
Arguments.of(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
deserThree,
String.format(
"Deserializer class %s does not deserialize byte[]", deserThree)),
Arguments.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
deserThree,
String.format(
"Deserializer class %s does not deserialize byte[]", deserThree)));
}

private class TestByteArrayDeserializer extends ByteArrayDeserializer {}
}
4 changes: 0 additions & 4 deletions flink-python/pyflink/datastream/connectors/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,6 @@ def set_property(self, key: str, value: str) -> 'KafkaSourceBuilder':
Note that the following keys will be overridden by the builder when the KafkaSource is
created.
* ``key.deserializer`` is always set to ByteArrayDeserializer.
* ``value.deserializer`` is always set to ByteArrayDeserializer.
* ``auto.offset.reset.strategy`` is overridden by AutoOffsetResetStrategy returned by
:class:`KafkaOffsetsInitializer` for the starting offsets, which is by default
:meth:`KafkaOffsetsInitializer.earliest`.
Expand All @@ -652,8 +650,6 @@ def set_properties(self, props: Dict) -> 'KafkaSourceBuilder':
Note that the following keys will be overridden by the builder when the KafkaSource is
created.
* ``key.deserializer`` is always set to ByteArrayDeserializer.
* ``value.deserializer`` is always set to ByteArrayDeserializer.
* ``auto.offset.reset.strategy`` is overridden by AutoOffsetResetStrategy returned by
:class:`KafkaOffsetsInitializer` for the starting offsets, which is by default
:meth:`KafkaOffsetsInitializer.earliest`.
Expand Down

0 comments on commit 86f796a

Please sign in to comment.