Skip to content

Commit

Permalink
Merge pull request #45816 from mcruzdev/config-kafka-client
Browse files Browse the repository at this point in the history
Convert kafka-client extension to use @ConfigMapping
  • Loading branch information
gsmet authored Jan 24, 2025
2 parents 4a53e63 + 37fb63f commit cb7b1a1
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 68 deletions.
3 changes: 0 additions & 3 deletions extensions/kafka-client/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
<compilerArgs>
<arg>-AlegacyConfigRoot=true</arg>
</compilerArgs>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private boolean hasKafkaChannelWithoutBootstrapServers() {
}

private KafkaDevServiceCfg getConfiguration(KafkaBuildTimeConfig cfg) {
KafkaDevServicesBuildTimeConfig devServicesConfig = cfg.devservices;
KafkaDevServicesBuildTimeConfig devServicesConfig = cfg.devservices();
return new KafkaDevServiceCfg(devServicesConfig);
}

Expand All @@ -324,17 +324,17 @@ private static final class KafkaDevServiceCfg {
private final RedpandaBuildTimeConfig redpanda;

public KafkaDevServiceCfg(KafkaDevServicesBuildTimeConfig config) {
this.devServicesEnabled = config.enabled.orElse(true);
this.provider = config.provider;
this.imageName = config.imageName.orElseGet(provider::getDefaultImageName);
this.fixedExposedPort = config.port.orElse(0);
this.shared = config.shared;
this.serviceName = config.serviceName;
this.topicPartitions = config.topicPartitions;
this.topicPartitionsTimeout = config.topicPartitionsTimeout;
this.containerEnv = config.containerEnv;

this.redpanda = config.redpanda;
this.devServicesEnabled = config.enabled().orElse(true);
this.provider = config.provider();
this.imageName = config.imageName().orElseGet(provider::getDefaultImageName);
this.fixedExposedPort = config.port().orElse(0);
this.shared = config.shared();
this.serviceName = config.serviceName();
this.topicPartitions = config.topicPartitions();
this.topicPartitionsTimeout = config.topicPartitionsTimeout();
this.containerEnv = config.containerEnv();

this.redpanda = config.redpanda();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,49 @@
package io.quarkus.kafka.client.deployment;

import io.quarkus.runtime.annotations.ConfigDocSection;
import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import io.smallrye.config.WithName;

@ConfigRoot(name = "kafka", phase = ConfigPhase.BUILD_TIME)
public class KafkaBuildTimeConfig {
@ConfigMapping(prefix = "quarkus.kafka")
@ConfigRoot(phase = ConfigPhase.BUILD_TIME)
public interface KafkaBuildTimeConfig {
/**
* Whether a health check is published in case the smallrye-health extension is present.
* <p>
* If you enable the health check, you must specify the `kafka.bootstrap.servers` property.
*/
@ConfigItem(name = "health.enabled", defaultValue = "false")
public boolean healthEnabled;
@WithName("health.enabled")
@WithDefault("false")
boolean healthEnabled();

/**
* Whether to enable Snappy in native mode.
* <p>
* Note that Snappy requires GraalVM 21+ and embeds a native library in the native executable.
* This library is unpacked and loaded when the application starts.
*/
@ConfigItem(name = "snappy.enabled", defaultValue = "false")
public boolean snappyEnabled;
@WithName("snappy.enabled")
@WithDefault("false")
boolean snappyEnabled();

/**
* Whether to load the Snappy native library from the shared classloader.
* This setting is only used in tests if the tests are using different profiles, which would lead to
* unsatisfied link errors when loading Snappy.
*/
@ConfigItem(name = "snappy.load-from-shared-classloader", defaultValue = "false")
public boolean snappyLoadFromSharedClassLoader;
@WithName("snappy.load-from-shared-classloader")
@WithDefault("false")
boolean snappyLoadFromSharedClassLoader();

/**
* Dev Services.
* <p>
* Dev Services allows Quarkus to automatically start Kafka in dev and test mode.
*/
@ConfigItem
@ConfigDocSection(generated = true)
public KafkaDevServicesBuildTimeConfig devservices;
KafkaDevServicesBuildTimeConfig devservices();

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,24 @@
import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigDocMapKey;
import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;
import io.smallrye.config.WithDefault;

@ConfigGroup
public class KafkaDevServicesBuildTimeConfig {
public interface KafkaDevServicesBuildTimeConfig {

/**
* If Dev Services for Kafka has been explicitly enabled or disabled. Dev Services are generally enabled
* by default, unless there is an existing configuration present. For Kafka, Dev Services starts a broker unless
* {@code kafka.bootstrap.servers} is set or if all the Reactive Messaging Kafka channel are configured with a
* {@code bootstrap.servers}.
*/
@ConfigItem
public Optional<Boolean> enabled = Optional.empty();
Optional<Boolean> enabled();

/**
* Optional fixed port the dev service will listen to.
* <p>
* If not defined, the port will be chosen randomly.
*/
@ConfigItem
public Optional<Integer> port;
Optional<Integer> port();

/**
* Kafka dev service container type.
Expand All @@ -44,10 +40,10 @@ public class KafkaDevServicesBuildTimeConfig {
* <p>
* Note that Strimzi and Kafka Native images are launched in Kraft mode.
*/
@ConfigItem(defaultValue = "redpanda")
public Provider provider = Provider.REDPANDA;
@WithDefault("redpanda")
Provider provider();

public enum Provider {
enum Provider {
REDPANDA("docker.io/redpandadata/redpanda:v24.1.2"),
STRIMZI("quay.io/strimzi-test-container/test-container:latest-kafka-3.7.0"),
KAFKA_NATIVE("quay.io/ogunalp/kafka-native:latest");
Expand All @@ -68,8 +64,7 @@ public String getDefaultImageName() {
* <p>
* Dependent on the provider.
*/
@ConfigItem
public Optional<String> imageName;
Optional<String> imageName();

/**
* Indicates if the Kafka broker managed by Quarkus Dev Services is shared.
Expand All @@ -82,8 +77,8 @@ public String getDefaultImageName() {
* <p>
* Container sharing is only used in dev mode.
*/
@ConfigItem(defaultValue = "true")
public boolean shared;
@WithDefault("true")
boolean shared();

/**
* The value of the {@code quarkus-dev-service-kafka} label attached to the started container.
Expand All @@ -95,8 +90,8 @@ public String getDefaultImageName() {
* <p>
* This property is used when you need multiple shared Kafka brokers.
*/
@ConfigItem(defaultValue = "kafka")
public String serviceName;
@WithDefault("kafka")
String serviceName();

/**
* The topic-partition pairs to create in the Dev Services Kafka broker.
Expand All @@ -106,29 +101,26 @@ public String getDefaultImageName() {
* <p>
* The topic creation will not try to re-partition existing topics with different number of partitions.
*/
@ConfigItem
@ConfigDocMapKey("topic-name")
public Map<String, Integer> topicPartitions;
Map<String, Integer> topicPartitions();

/**
* Timeout for admin client calls used in topic creation.
* <p>
* Defaults to 2 seconds.
*/
@ConfigItem(defaultValue = "2S")
public Duration topicPartitionsTimeout;
@WithDefault("2S")
Duration topicPartitionsTimeout();

/**
* Environment variables that are passed to the container.
*/
@ConfigItem
@ConfigDocMapKey("environment-variable-name")
public Map<String, String> containerEnv;
Map<String, String> containerEnv();

/**
* Allows configuring the Redpanda broker.
*/
@ConfigItem
public RedpandaBuildTimeConfig redpanda;
RedpandaBuildTimeConfig redpanda();

}
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public void handleSnappyInNative(BuildProducer<ReflectiveClassBuildItem> reflect
void loadSnappyIfEnabled(LaunchModeBuildItem launch, SnappyRecorder recorder, KafkaBuildTimeConfig config) {
boolean loadFromSharedClassLoader = false;
if (launch.isTest()) {
loadFromSharedClassLoader = config.snappyLoadFromSharedClassLoader;
loadFromSharedClassLoader = config.snappyLoadFromSharedClassLoader();
}
recorder.loadSnappy(loadFromSharedClassLoader);
}
Expand Down Expand Up @@ -482,7 +482,7 @@ public void accept(ClassInfo c) {
@BuildStep
HealthBuildItem addHealthCheck(KafkaBuildTimeConfig buildTimeConfig) {
return new HealthBuildItem("io.quarkus.kafka.client.health.KafkaHealthCheck",
buildTimeConfig.healthEnabled);
buildTimeConfig.healthEnabled());
}

@BuildStep
Expand Down Expand Up @@ -546,7 +546,7 @@ public HasSnappy(KafkaBuildTimeConfig config) {

@Override
public boolean getAsBoolean() {
return QuarkusClassLoader.isClassPresentAtRuntime("org.xerial.snappy.OSInfo") && config.snappyEnabled;
return QuarkusClassLoader.isClassPresentAtRuntime("org.xerial.snappy.OSInfo") && config.snappyEnabled();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@

import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;
import io.smallrye.config.WithDefault;

/**
* Allows configuring the Redpanda broker.
* Notice that Redpanda is not a "genuine" Kafka, it's a 100% compatible implementation of the protocol.
*
* Find more info about Redpanda on <a href="https://redpanda.com/">https://redpanda.com/</a>.
*/
@ConfigGroup
public class RedpandaBuildTimeConfig {
public interface RedpandaBuildTimeConfig {

/**
* Enables transaction support.
Expand All @@ -28,14 +26,13 @@ public class RedpandaBuildTimeConfig {
* <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820">KIP-360 (Improve reliability of
* idempotent/transactional producer)</a> are <em>not</em> supported.
*/
@ConfigItem(defaultValue = "true")
public boolean transactionEnabled;
@WithDefault("true")
boolean transactionEnabled();

/**
* Port to access the Redpanda HTTP Proxy (<a href="https://docs.redpanda.com/current/develop/http-proxy/">pandaproxy</a>).
* <p>
* If not defined, the port will be chosen randomly.
*/
@ConfigItem
public Optional<Integer> proxyPort;
Optional<Integer> proxyPort();
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole
command += String.format("--kafka-addr %s ", getKafkaAddresses());
command += String.format("--advertise-kafka-addr %s ", getKafkaAdvertisedAddresses());
command += "--set redpanda.auto_create_topics_enabled=true ";
if (redpandaConfig.transactionEnabled) {
if (redpandaConfig.transactionEnabled()) {
command += "--set redpanda.enable_idempotence=true ";
command += "--set redpanda.enable_transactions=true ";
}
Expand Down Expand Up @@ -104,8 +104,8 @@ protected void configure() {
addFixedExposedPort(fixedExposedPort, DevServicesKafkaProcessor.KAFKA_PORT);
}

if (redpandaConfig.proxyPort.isPresent()) {
addFixedExposedPort(redpandaConfig.proxyPort.get(), PANDAPROXY_PORT);
if (redpandaConfig.proxyPort().isPresent()) {
addFixedExposedPort(redpandaConfig.proxyPort().get(), PANDAPROXY_PORT);
} else {
addExposedPort(PANDAPROXY_PORT);
}
Expand Down
3 changes: 0 additions & 3 deletions extensions/kafka-client/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
<compilerArgs>
<arg>-AlegacyConfigRoot=true</arg>
</compilerArgs>
</configuration>
</execution>
</executions>
Expand Down

0 comments on commit cb7b1a1

Please sign in to comment.