From f6a077a9dd8d1d5e43fc545cc9baab227d8438a0 Mon Sep 17 00:00:00 2001 From: Mingliang Liu Date: Tue, 3 Dec 2024 05:11:18 -0800 Subject: [PATCH] [FLINK-36780] Kafka source disable partition discovery unexpectedly (#136) --- .../kafka/source/KafkaSourceBuilder.java | 7 +++--- .../kafka/source/KafkaSourceBuilderTest.java | 23 +++++++++++++++++++ 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java index 78a4b0b60..0709afe0b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java @@ -474,10 +474,9 @@ private void parseAndSetRequiredProperties() { true); // If the source is bounded, do not run periodic partition discovery. - maybeOverride( - KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), - "-1", - boundedness == Boundedness.BOUNDED); + if (boundedness == Boundedness.BOUNDED) { + maybeOverride(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1", true); + } // If the client id prefix is not set, reuse the consumer group id as the client id prefix, // or generate a random string if consumer group id is not specified. diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java index 2829f01e0..ca777bc73 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java @@ -217,6 +217,29 @@ public void testSettingInvalidCustomDeserializers( .hasMessageContaining(expectedError); } + @Test + public void testDefaultPartitionDiscovery() { + final KafkaSource kafkaSource = getBasicBuilder().build(); + // Commit on checkpoint and auto commit should be disabled because group.id is not specified + assertThat( + kafkaSource + .getConfiguration() + .get(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS)) + .isEqualTo(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.defaultValue()); + } + + @Test + public void testPeriodPartitionDiscovery() { + final KafkaSource kafkaSource = + getBasicBuilder().setBounded(OffsetsInitializer.latest()).build(); + // Commit on checkpoint and auto commit should be disabled because group.id is not specified + assertThat( + kafkaSource + .getConfiguration() + .get(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS)) + .isEqualTo(-1L); + } + private KafkaSourceBuilder getBasicBuilder() { return new KafkaSourceBuilder() .setBootstrapServers("testServer")