Skip to content

Commit

Permalink
[FLINK-36210] Optimize the logic for fetching topic metadata in the T…
Browse files Browse the repository at this point in the history
…opicPatternSubscriber mode (#117)

In TopicPatternSubscriber mode, our current logic for fetch topic metadata for all topics and then filtering it. We can optimize this by first filtering the topic names and then fetch metadata only for the filtered topics.

Co-authored-by: ClownXC <[email protected]>
  • Loading branch information
xiaochen-zhou and xiaochen-zhou authored Sep 12, 2024
1 parent 3730005 commit 7929b16
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/** The base implementations of {@link KafkaSubscriber}. */
class KafkaSubscriberUtils {
Expand All @@ -38,6 +40,22 @@ static Map<String, TopicDescription> getAllTopicMetadata(AdminClient adminClient
}
}

static Map<String, TopicDescription> getTopicMetadata(
AdminClient adminClient, Pattern topicPattern) {
try {
Set<String> allTopicNames = adminClient.listTopics().names().get();
Set<String> matchedTopicNames =
allTopicNames.stream()
.filter(name -> topicPattern.matcher(name).matches())
.collect(Collectors.toSet());
return getTopicMetadata(adminClient, matchedTopicNames);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to get metadata for %s topics.", topicPattern.pattern()),
e);
}
}

static Map<String, TopicDescription> getTopicMetadata(
AdminClient adminClient, Set<String> topicNames) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.Set;
import java.util.regex.Pattern;

import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getAllTopicMetadata;
import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;

/** A subscriber to a topic pattern. */
class TopicPatternSubscriber implements KafkaSubscriber {
Expand All @@ -44,19 +44,17 @@ class TopicPatternSubscriber implements KafkaSubscriber {

@Override
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
LOG.debug("Fetching descriptions for all topics on Kafka cluster");
final Map<String, TopicDescription> allTopicMetadata = getAllTopicMetadata(adminClient);
LOG.debug("Fetching descriptions for {} topics on Kafka cluster", topicPattern.pattern());
final Map<String, TopicDescription> matchedTopicMetadata =
getTopicMetadata(adminClient, topicPattern);

Set<TopicPartition> subscribedTopicPartitions = new HashSet<>();

allTopicMetadata.forEach(
matchedTopicMetadata.forEach(
(topicName, topicDescription) -> {
if (topicPattern.matcher(topicName).matches()) {
for (TopicPartitionInfo partition : topicDescription.partitions()) {
subscribedTopicPartitions.add(
new TopicPartition(
topicDescription.name(), partition.partition()));
}
for (TopicPartitionInfo partition : topicDescription.partitions()) {
subscribedTopicPartitions.add(
new TopicPartition(topicDescription.name(), partition.partition()));
}
});

Expand Down

0 comments on commit 7929b16

Please sign in to comment.