Skip to content

Commit

Permalink
[FLINK-35228][Connectors/Kafka] Fix: DynamicKafkaSource does not read…
Browse files Browse the repository at this point in the history
… re-added topic for the same cluster (#97)
  • Loading branch information
IgnasD authored Apr 29, 2024
1 parent 369e7be commit 00c9c8c
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,36 +288,42 @@ private void onHandleSubscribedStreamsFetch(Set<KafkaStream> fetchedKafkaStreams

// create enumerators
for (Entry<String, Set<String>> activeClusterTopics : latestClusterTopicsMap.entrySet()) {
final Set<TopicPartition> activeTopicPartitions = new HashSet<>();

if (dynamicKafkaSourceEnumState
KafkaSourceEnumState kafkaSourceEnumState =
dynamicKafkaSourceEnumState
.getClusterEnumeratorStates()
.get(activeClusterTopics.getKey())
!= null) {
Set<TopicPartition> oldTopicPartitions =
dynamicKafkaSourceEnumState
.getClusterEnumeratorStates()
.get(activeClusterTopics.getKey())
.assignedPartitions();
.get(activeClusterTopics.getKey());

final KafkaSourceEnumState newKafkaSourceEnumState;
if (kafkaSourceEnumState != null) {
final Set<String> activeTopics = activeClusterTopics.getValue();

// filter out removed topics
for (TopicPartition oldTopicPartition : oldTopicPartitions) {
if (activeClusterTopics.getValue().contains(oldTopicPartition.topic())) {
activeTopicPartitions.add(oldTopicPartition);
}
}
Set<TopicPartition> activeAssignedPartitions =
kafkaSourceEnumState.assignedPartitions().stream()
.filter(tp -> activeTopics.contains(tp.topic()))
.collect(Collectors.toSet());
Set<TopicPartition> activeUnassignedInitialPartitions =
kafkaSourceEnumState.unassignedInitialPartitions().stream()
.filter(tp -> activeTopics.contains(tp.topic()))
.collect(Collectors.toSet());

newKafkaSourceEnumState =
new KafkaSourceEnumState(
activeAssignedPartitions,
activeUnassignedInitialPartitions,
kafkaSourceEnumState.initialDiscoveryFinished());
} else {
newKafkaSourceEnumState =
new KafkaSourceEnumState(
Collections.emptySet(), Collections.emptySet(), false);
}

// restarts enumerator from state using only the active topic partitions, to avoid
// sending duplicate splits from enumerator
createEnumeratorWithAssignedTopicPartitions(
activeClusterTopics.getKey(),
activeClusterTopics.getValue(),
dynamicKafkaSourceEnumState
.getClusterEnumeratorStates()
.getOrDefault(
activeClusterTopics.getKey(),
new KafkaSourceEnumState(
Collections.emptySet(), Collections.emptySet(), false)),
newKafkaSourceEnumState,
clusterProperties.get(activeClusterTopics.getKey()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroup.DYNAMIC_KAFKA_SOURCE_METRIC_GROUP;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -372,6 +373,132 @@ void testMigrationUsingFileMetadataService() throws Throwable {
.collect(Collectors.toList()));
}

@Test
void testTopicReAddMigrationUsingFileMetadataService() throws Throwable {
// setup topics
int kafkaClusterIdx = 0;
String topic1 = "test-topic-re-add-1";
String topic2 = "test-topic-re-add-2";
DynamicKafkaSourceTestHelper.createTopic(kafkaClusterIdx, topic1, NUM_PARTITIONS);
DynamicKafkaSourceTestHelper.createTopic(kafkaClusterIdx, topic2, NUM_PARTITIONS);

// Flink job config and env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(2);
Properties properties = new Properties();
properties.setProperty(
KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), "1000");
properties.setProperty(
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "5000");
properties.setProperty(
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD.key(),
"2");
properties.setProperty(CommonClientConfigs.GROUP_ID_CONFIG, "dynamic-kafka-src");

// create new metadata file to consume from 1 cluster
String testStreamId = "test-topic-re-add-stream";
File metadataFile = File.createTempFile(testDir.getPath() + "/metadata", ".yaml");
YamlFileMetadataService yamlFileMetadataService =
new YamlFileMetadataService(metadataFile.getPath(), Duration.ofMillis(100));
writeClusterMetadataToFile(
metadataFile,
testStreamId,
ImmutableList.of(topic1),
ImmutableList.of(
DynamicKafkaSourceTestHelper.getKafkaClusterTestEnvMetadata(
kafkaClusterIdx)));

DynamicKafkaSource<Integer> dynamicKafkaSource =
DynamicKafkaSource.<Integer>builder()
.setStreamIds(Collections.singleton(testStreamId))
.setKafkaMetadataService(yamlFileMetadataService)
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(
IntegerDeserializer.class))
.setStartingOffsets(OffsetsInitializer.earliest())
.setProperties(properties)
.build();

DataStreamSource<Integer> stream =
env.fromSource(
dynamicKafkaSource,
WatermarkStrategy.noWatermarks(),
"dynamic-kafka-src");
List<Integer> results = new ArrayList<>();

int stage1Records =
DynamicKafkaSourceTestHelper.produceToKafka(
kafkaClusterIdx, topic1, NUM_PARTITIONS, NUM_RECORDS_PER_SPLIT, 0);
int stage2Records =
DynamicKafkaSourceTestHelper.produceToKafka(
kafkaClusterIdx,
topic2,
NUM_PARTITIONS,
NUM_RECORDS_PER_SPLIT,
stage1Records);

try (CloseableIterator<Integer> iterator = stream.executeAndCollect()) {
CommonTestUtils.waitUtil(
() -> {
try {
results.add(iterator.next());

// switch to second topic after first is read
if (results.size() == stage1Records) {
writeClusterMetadataToFile(
metadataFile,
testStreamId,
ImmutableList.of(topic2),
ImmutableList.of(
DynamicKafkaSourceTestHelper
.getKafkaClusterTestEnvMetadata(
kafkaClusterIdx)));
}

// re-add first topic again after second is read
// produce another batch to first topic
if (results.size() == stage2Records) {
DynamicKafkaSourceTestHelper.produceToKafka(
kafkaClusterIdx,
topic1,
NUM_PARTITIONS,
NUM_RECORDS_PER_SPLIT,
stage2Records);
writeClusterMetadataToFile(
metadataFile,
testStreamId,
ImmutableList.of(topic1, topic2),
ImmutableList.of(
DynamicKafkaSourceTestHelper
.getKafkaClusterTestEnvMetadata(
kafkaClusterIdx)));
}
} catch (NoSuchElementException e) {
// swallow and wait
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (Throwable e) {
throw new RuntimeException(e);
}

// first batch of topic 1 * 2 + topic 2 + second batch of topic 1
return results.size() == NUM_PARTITIONS * NUM_RECORDS_PER_SPLIT * 4;
},
Duration.ofSeconds(15),
"Could not schedule callable within timeout");
}

// verify data
Stream<Integer> expectedFullRead =
IntStream.range(0, NUM_PARTITIONS * NUM_RECORDS_PER_SPLIT * 3).boxed();
Stream<Integer> expectedReRead =
IntStream.range(0, NUM_PARTITIONS * NUM_RECORDS_PER_SPLIT).boxed();
List<Integer> expectedResults =
Stream.concat(expectedFullRead, expectedReRead).collect(Collectors.toList());
assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
}

@Test
void testStreamPatternSubscriber() throws Throwable {
DynamicKafkaSourceTestHelper.createTopic(0, "stream-pattern-test-1", NUM_PARTITIONS);
Expand Down Expand Up @@ -621,7 +748,7 @@ private void writeClusterMetadataToFile(File metadataFile, Set<KafkaStream> kafk
private void writeClusterMetadataToFile(
File metadataFile,
String streamId,
String topic,
List<String> topics,
List<KafkaTestBase.KafkaClusterTestEnvMetadata> kafkaClusterTestEnvMetadataList)
throws IOException {
List<YamlFileMetadataService.StreamMetadata.ClusterMetadata> clusterMetadata =
Expand All @@ -633,14 +760,27 @@ private void writeClusterMetadataToFile(
KafkaClusterTestEnvMetadata.getKafkaClusterId(),
KafkaClusterTestEnvMetadata
.getBrokerConnectionStrings(),
ImmutableList.of(topic)))
topics))
.collect(Collectors.toList());
YamlFileMetadataService.StreamMetadata streamMetadata =
new YamlFileMetadataService.StreamMetadata(streamId, clusterMetadata);
YamlFileMetadataService.saveToYaml(
Collections.singletonList(streamMetadata), metadataFile);
}

private void writeClusterMetadataToFile(
File metadataFile,
String streamId,
String topic,
List<KafkaTestBase.KafkaClusterTestEnvMetadata> kafkaClusterTestEnvMetadataList)
throws IOException {
writeClusterMetadataToFile(
metadataFile,
streamId,
ImmutableList.of(topic),
kafkaClusterTestEnvMetadataList);
}

private Set<String> findMetrics(InMemoryReporter inMemoryReporter, String groupPattern) {
Optional<MetricGroup> groups = inMemoryReporter.findGroup(groupPattern);
assertThat(groups).isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber;
import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.enumerator.AssignmentStatus;
import org.apache.flink.connector.kafka.source.enumerator.TopicPartitionAndAssignmentStatus;
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.testutils.MockKafkaMetadataService;
Expand Down Expand Up @@ -464,6 +466,91 @@ public void testSnapshotState() throws Throwable {
}
}

@Test
public void testEnumeratorStateDoesNotContainStaleTopicPartitions() throws Throwable {
final String topic2 = TOPIC + "_2";

DynamicKafkaSourceTestHelper.createTopic(topic2, NUM_SPLITS_PER_CLUSTER, 1);
DynamicKafkaSourceTestHelper.produceToKafka(
topic2, NUM_SPLITS_PER_CLUSTER, NUM_RECORDS_PER_SPLIT);

final Set<KafkaStream> initialStreams =
Collections.singleton(
new KafkaStream(
TOPIC,
DynamicKafkaSourceTestHelper.getClusterMetadataMap(
0, TOPIC, topic2)));

final Set<KafkaStream> updatedStreams =
Collections.singleton(
new KafkaStream(
TOPIC,
DynamicKafkaSourceTestHelper.getClusterMetadataMap(0, TOPIC)));

try (MockKafkaMetadataService metadataService =
new MockKafkaMetadataService(initialStreams);
MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
DynamicKafkaSourceEnumerator enumerator =
createEnumerator(
context,
metadataService,
(properties) ->
properties.setProperty(
DynamicKafkaSourceOptions
.STREAM_METADATA_DISCOVERY_INTERVAL_MS
.key(),
"1"))) {
enumerator.start();

context.runPeriodicCallable(0);

runAllOneTimeCallables(context);

mockRegisterReaderAndSendReaderStartupEvent(context, enumerator, 0);
mockRegisterReaderAndSendReaderStartupEvent(context, enumerator, 1);

DynamicKafkaSourceEnumState initialState = enumerator.snapshotState(-1);

assertThat(getFilteredTopicPartitions(initialState, TOPIC, AssignmentStatus.ASSIGNED))
.hasSize(2);
assertThat(
getFilteredTopicPartitions(
initialState, TOPIC, AssignmentStatus.UNASSIGNED_INITIAL))
.hasSize(1);
assertThat(getFilteredTopicPartitions(initialState, topic2, AssignmentStatus.ASSIGNED))
.hasSize(2);
assertThat(
getFilteredTopicPartitions(
initialState, topic2, AssignmentStatus.UNASSIGNED_INITIAL))
.hasSize(1);

// mock metadata change
metadataService.setKafkaStreams(updatedStreams);

// changes should have occurred here
context.runPeriodicCallable(0);
runAllOneTimeCallables(context);

mockRegisterReaderAndSendReaderStartupEvent(context, enumerator, 2);

DynamicKafkaSourceEnumState migratedState = enumerator.snapshotState(-1);

assertThat(getFilteredTopicPartitions(migratedState, TOPIC, AssignmentStatus.ASSIGNED))
.hasSize(3);
assertThat(
getFilteredTopicPartitions(
migratedState, TOPIC, AssignmentStatus.UNASSIGNED_INITIAL))
.isEmpty();
assertThat(getFilteredTopicPartitions(migratedState, topic2, AssignmentStatus.ASSIGNED))
.isEmpty();
assertThat(
getFilteredTopicPartitions(
migratedState, topic2, AssignmentStatus.UNASSIGNED_INITIAL))
.isEmpty();
}
}

@Test
public void testStartupWithCheckpointState() throws Throwable {
// init enumerator with checkpoint state
Expand Down Expand Up @@ -865,6 +952,18 @@ private Map<Integer, Set<DynamicKafkaSourceSplit>> getReaderAssignments(
return readerToSplits;
}

private List<TopicPartition> getFilteredTopicPartitions(
DynamicKafkaSourceEnumState state, String topic, AssignmentStatus assignmentStatus) {
return state.getClusterEnumeratorStates().values().stream()
.flatMap(s -> s.partitions().stream())
.filter(
partition ->
partition.topicPartition().topic().equals(topic)
&& partition.assignmentStatus() == assignmentStatus)
.map(TopicPartitionAndAssignmentStatus::topicPartition)
.collect(Collectors.toList());
}

private static void runAllOneTimeCallables(MockSplitEnumeratorContext context)
throws Throwable {
while (!context.getOneTimeCallables().isEmpty()) {
Expand Down
Loading

0 comments on commit 00c9c8c

Please sign in to comment.