-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update word-count to streams bootstrap 3 (#28)
- Loading branch information
Showing
12 changed files
with
174 additions
and
112 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,6 @@ streams-bootstrap-v2: | |
enabled: false | ||
|
||
replicaCount: 1 | ||
debug: true | ||
|
||
producer-app-v2: | ||
to: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
kpops~=8.0 | ||
kpops~=8.1.4 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,3 @@ | ||
version=1.0.0-SNAPSHOT | ||
version=2.0.0-SNAPSHOT | ||
org.gradle.caching=true | ||
org.gradle.parallel=true | ||
junitVersion=5.9.2 | ||
kafkaVersion=3.3.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
distributionBase=GRADLE_USER_HOME | ||
distributionPath=wrapper/dists | ||
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip | ||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip | ||
networkTimeout=10000 | ||
validateDistributionUrl=true | ||
zipStoreBase=GRADLE_USER_HOME | ||
zipStorePath=wrapper/dists |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
65 changes: 33 additions & 32 deletions
65
word-count/code/src/main/java/com/bakdata/kpops/examples/SentenceProducer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,52 +1,53 @@ | ||
package com.bakdata.kpops.examples; | ||
|
||
import com.bakdata.kafka.KafkaProducerApplication; | ||
import com.bakdata.kafka.ProducerApp; | ||
import com.bakdata.kafka.ProducerBuilder; | ||
import com.bakdata.kafka.ProducerRunnable; | ||
import com.bakdata.kafka.SerializerConfig; | ||
import com.bakdata.kafka.SimpleKafkaProducerApplication; | ||
import com.google.common.io.Resources; | ||
import java.io.IOException; | ||
import java.net.URL; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.List; | ||
import java.util.Properties; | ||
import lombok.Setter; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.common.serialization.StringSerializer; | ||
|
||
import java.io.IOException; | ||
import java.net.URL; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.List; | ||
|
||
import static com.bakdata.kafka.KafkaApplication.startApplication; | ||
|
||
@Setter | ||
public class SentenceProducer extends KafkaProducerApplication { | ||
public class SentenceProducer implements ProducerApp { | ||
static final String FILE_NAME = "kpops.txt"; | ||
|
||
public static void main(final String[] args) { | ||
startApplication(new SentenceProducer(), args); | ||
startApplication( | ||
new SimpleKafkaProducerApplication<>(SentenceProducer::new), | ||
args | ||
); | ||
} | ||
|
||
@Override | ||
protected Properties createKafkaProperties() { | ||
final Properties kafkaProperties = super.createKafkaProperties(); | ||
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||
return kafkaProperties; | ||
} | ||
|
||
@Override | ||
protected void runApplication() { | ||
try (final KafkaProducer<String, String> producer = this.createProducer()) { | ||
final URL url = Resources.getResource(FILE_NAME); | ||
final List<String> textLines = Resources.readLines(url, StandardCharsets.UTF_8); | ||
|
||
for (final String textLine : textLines) { | ||
this.publish(producer, textLine); | ||
public ProducerRunnable buildRunnable(final ProducerBuilder producerBuilder) { | ||
return () -> { | ||
try (final Producer<String, String> producer = producerBuilder.createProducer()) { | ||
final URL url = Resources.getResource(FILE_NAME); | ||
final List<String> textLines = Resources.readLines(url, StandardCharsets.UTF_8); | ||
final String outputTopic = producerBuilder.getTopics().getOutputTopic(); | ||
for (final String textLine : textLines) { | ||
producer.send(new ProducerRecord<>(outputTopic, null, textLine)); | ||
} | ||
producer.flush(); | ||
} catch (final IOException e) { | ||
throw new RuntimeException("Error occurred while reading the .txt file.", e); | ||
} | ||
producer.flush(); | ||
} catch (final IOException e) { | ||
throw new RuntimeException("Error occurred while reading the .txt file.", e); | ||
} | ||
}; | ||
} | ||
|
||
private void publish(final Producer<? super String, ? super String> producer, final String line) { | ||
producer.send(new ProducerRecord<>(this.getOutputTopic(), null, line)); | ||
@Override | ||
public SerializerConfig defaultSerializationConfig() { | ||
return new SerializerConfig(StringSerializer.class, StringSerializer.class); | ||
} | ||
} |
41 changes: 22 additions & 19 deletions
41
word-count/code/src/main/java/com/bakdata/kpops/examples/WordCountApplication.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,46 +1,49 @@ | ||
package com.bakdata.kpops.examples; | ||
|
||
import com.bakdata.kafka.KafkaStreamsApplication; | ||
import java.util.Arrays; | ||
import java.util.Properties; | ||
import java.util.regex.Pattern; | ||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import com.bakdata.kafka.KafkaApplication; | ||
import com.bakdata.kafka.SerdeConfig; | ||
import com.bakdata.kafka.SimpleKafkaStreamsApplication; | ||
import com.bakdata.kafka.StreamsApp; | ||
import com.bakdata.kafka.StreamsTopicConfig; | ||
import com.bakdata.kafka.TopologyBuilder; | ||
import org.apache.kafka.common.serialization.Serdes.StringSerde; | ||
import org.apache.kafka.streams.StreamsBuilder; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.apache.kafka.streams.kstream.KStream; | ||
import org.apache.kafka.streams.kstream.KTable; | ||
|
||
public class WordCountApplication extends KafkaStreamsApplication { | ||
import java.util.Arrays; | ||
import java.util.regex.Pattern; | ||
|
||
public class WordCountApplication implements StreamsApp { | ||
private static final Pattern COMPILE = Pattern.compile("\\W+"); | ||
|
||
public static void main(final String[] args) { | ||
startApplication(new WordCountApplication(), args); | ||
KafkaApplication.startApplication( | ||
new SimpleKafkaStreamsApplication<>(WordCountApplication::new), | ||
args | ||
); | ||
} | ||
|
||
@Override | ||
public void buildTopology(final StreamsBuilder builder) { | ||
final KStream<String, String> textLines = builder.stream(this.getInputTopics()); | ||
public void buildTopology(final TopologyBuilder builder) { | ||
final KStream<String, String> textLines = builder.streamInput(); | ||
final KTable<String, String> wordCounts = textLines | ||
.flatMapValues(value -> Arrays.asList(COMPILE.split(value.toLowerCase()))) | ||
.groupBy((key, value) -> value) | ||
.count() | ||
// The redis sink connection lacks a Long converter and instead relies on a string converter. | ||
.mapValues(Object::toString); | ||
|
||
wordCounts.toStream().to(this.getOutputTopic()); | ||
wordCounts.toStream() | ||
.to(builder.getTopics().getOutputTopic()); | ||
} | ||
|
||
@Override | ||
public String getUniqueAppId() { | ||
return String.format("word-count-app-%s", this.getOutputTopic()); | ||
public String getUniqueAppId(final StreamsTopicConfig topics) { | ||
return String.format("word-count-app-%s", topics.getOutputTopic()); | ||
} | ||
|
||
@Override | ||
protected Properties createKafkaProperties() { | ||
final Properties kafkaProperties = super.createKafkaProperties(); | ||
kafkaProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); | ||
kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); | ||
return kafkaProperties; | ||
public SerdeConfig defaultSerializationConfig() { | ||
return new SerdeConfig(StringSerde.class, StringSerde.class); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.