Skip to content

Commit

Permalink
change patternTopic* to matchedTopic*
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed Sep 5, 2024
1 parent e2b706f commit c162696
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ static Map<String, TopicDescription> getAllTopicMetadata(AdminClient adminClient
static Map<String, TopicDescription> getTopicMetadata(AdminClient adminClient, Pattern topicPattern) {
try {
Set<String> allTopicNames = adminClient.listTopics().names().get();
Set<String> patternTopicNames = allTopicNames.stream()
Set<String> matchedTopicNames = allTopicNames.stream()
.filter(name -> topicPattern.matcher(name).matches())
.collect(Collectors.toSet());
return getTopicMetadata(adminClient, patternTopicNames);
return getTopicMetadata(adminClient, matchedTopicNames);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to get metadata for %s topics.", topicPattern.pattern()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ class TopicPatternSubscriber implements KafkaSubscriber {
@Override
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) {
LOG.debug("Fetching descriptions for {} topics on Kafka cluster", topicPattern.pattern());
final Map<String, TopicDescription> patternTopicMetadata = getTopicMetadata(adminClient, topicPattern);
final Map<String, TopicDescription> matchedTopicMetadata = getTopicMetadata(adminClient, topicPattern);

Set<TopicPartition> subscribedTopicPartitions = new HashSet<>();

patternTopicMetadata.forEach(
matchedTopicMetadata.forEach(
(topicName, topicDescription) -> {
for (TopicPartitionInfo partition : topicDescription.partitions()) {
subscribedTopicPartitions.add(
Expand Down

0 comments on commit c162696

Please sign in to comment.