Skip to content

Commit

Permalink
Convert kafka-streams extension to use @ConfigMapping
Browse files Browse the repository at this point in the history
  • Loading branch information
mcruzdev committed Jan 23, 2025
1 parent 7d23bba commit c85dffd
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 112 deletions.
3 changes: 0 additions & 3 deletions extensions/kafka-streams/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
<compilerArgs>
<arg>-AlegacyConfigRoot=true</arg>
</compilerArgs>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package io.quarkus.kafka.streams.deployment;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import io.smallrye.config.WithName;

@ConfigRoot(name = "kafka-streams", phase = ConfigPhase.BUILD_TIME)
public class KafkaStreamsBuildTimeConfig {
@ConfigMapping(prefix = "quarkus.kafka-streams")
@ConfigRoot(phase = ConfigPhase.BUILD_TIME)
public interface KafkaStreamsBuildTimeConfig {

/**
* Whether a health check is published in case the smallrye-health extension is present (defaults to true).
*/
@ConfigItem(name = "health.enabled", defaultValue = "true")
public boolean healthEnabled;
@WithName("health.enabled")
@WithDefault("true")
boolean healthEnabled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,11 @@ void addHealthChecks(KafkaStreamsBuildTimeConfig buildTimeConfig, BuildProducer<
healthChecks.produce(
new HealthBuildItem(
"io.quarkus.kafka.streams.runtime.health.KafkaStreamsTopicsHealthCheck",
buildTimeConfig.healthEnabled));
buildTimeConfig.healthEnabled()));
healthChecks.produce(
new HealthBuildItem(
"io.quarkus.kafka.streams.runtime.health.KafkaStreamsStateHealthCheck",
buildTimeConfig.healthEnabled));
buildTimeConfig.healthEnabled()));
}

@BuildStep
Expand Down
3 changes: 0 additions & 3 deletions extensions/kafka-streams/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
<compilerArgs>
<arg>-AlegacyConfigRoot=true</arg>
</compilerArgs>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream

Properties buildTimeProperties = kafkaStreamsSupport.getProperties();

String bootstrapServersConfig = asString(runtimeConfig.bootstrapServers);
String bootstrapServersConfig = asString(runtimeConfig.bootstrapServers());
if (DEFAULT_KAFKA_BROKER.equalsIgnoreCase(bootstrapServersConfig)) {
// Try to see if kafka.bootstrap.servers is set, if so, use that value, if not, keep localhost:9092
bootstrapServersConfig = ConfigProvider.getConfig().getOptionalValue("kafka.bootstrap.servers", String.class)
Expand All @@ -109,7 +109,7 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream

this.executorService = executorService;

this.topicsTimeout = runtimeConfig.topicsTimeout;
this.topicsTimeout = runtimeConfig.topicsTimeout();
this.trimmedTopics = isTopicsCheckEnabled() ? runtimeConfig.getTrimmedTopics() : Collections.emptyList();
this.streamsConfig = new StreamsConfig(kafkaStreamsProperties);
this.kafkaStreams = initializeKafkaStreams(streamsConfig, topology.get(),
Expand Down Expand Up @@ -217,51 +217,52 @@ private static Properties getStreamsProperties(Properties properties,

// add runtime options
streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, runtimeConfig.applicationId);
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, runtimeConfig.applicationId());

// app id
if (runtimeConfig.applicationServer.isPresent()) {
streamsProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, runtimeConfig.applicationServer.get());
if (runtimeConfig.applicationServer().isPresent()) {
streamsProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, runtimeConfig.applicationServer().get());
}

// schema registry
if (runtimeConfig.schemaRegistryUrl.isPresent()) {
streamsProperties.put(runtimeConfig.schemaRegistryKey, runtimeConfig.schemaRegistryUrl.get());
if (runtimeConfig.schemaRegistryUrl().isPresent()) {
streamsProperties.put(runtimeConfig.schemaRegistryKey(), runtimeConfig.schemaRegistryUrl().get());
}

// set the security protocol (in case we are doing PLAIN_TEXT)
setProperty(runtimeConfig.securityProtocol, streamsProperties, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
setProperty(runtimeConfig.securityProtocol(), streamsProperties, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);

// sasl
SaslConfig sc = runtimeConfig.sasl;
SaslConfig sc = runtimeConfig.sasl();
if (sc != null) {
setProperty(sc.mechanism, streamsProperties, SaslConfigs.SASL_MECHANISM);
setProperty(sc.mechanism(), streamsProperties, SaslConfigs.SASL_MECHANISM);

setProperty(sc.jaasConfig, streamsProperties, SaslConfigs.SASL_JAAS_CONFIG);
setProperty(sc.jaasConfig(), streamsProperties, SaslConfigs.SASL_JAAS_CONFIG);

setProperty(sc.clientCallbackHandlerClass, streamsProperties, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS);
setProperty(sc.clientCallbackHandlerClass(), streamsProperties, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS);

setProperty(sc.loginCallbackHandlerClass, streamsProperties, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS);
setProperty(sc.loginClass, streamsProperties, SaslConfigs.SASL_LOGIN_CLASS);
setProperty(sc.loginCallbackHandlerClass(), streamsProperties, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS);
setProperty(sc.loginClass(), streamsProperties, SaslConfigs.SASL_LOGIN_CLASS);

setProperty(sc.kerberosServiceName, streamsProperties, SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
setProperty(sc.kerberosKinitCmd, streamsProperties, SaslConfigs.SASL_KERBEROS_KINIT_CMD);
setProperty(sc.kerberosTicketRenewWindowFactor, streamsProperties,
setProperty(sc.kerberosServiceName(), streamsProperties, SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
setProperty(sc.kerberosKinitCmd(), streamsProperties, SaslConfigs.SASL_KERBEROS_KINIT_CMD);
setProperty(sc.kerberosTicketRenewWindowFactor(), streamsProperties,
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
setProperty(sc.kerberosTicketRenewJitter, streamsProperties, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER);
setProperty(sc.kerberosMinTimeBeforeRelogin, streamsProperties, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
setProperty(sc.kerberosTicketRenewJitter(), streamsProperties, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER);
setProperty(sc.kerberosMinTimeBeforeRelogin(), streamsProperties,
SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);

setProperty(sc.loginRefreshWindowFactor, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR);
setProperty(sc.loginRefreshWindowJitter, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER);
setProperty(sc.loginRefreshWindowFactor(), streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR);
setProperty(sc.loginRefreshWindowJitter(), streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER);

setProperty(sc.loginRefreshMinPeriod, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS,
setProperty(sc.loginRefreshMinPeriod(), streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS,
DurationToSecondsFunction.INSTANCE);
setProperty(sc.loginRefreshBuffer, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS,
setProperty(sc.loginRefreshBuffer(), streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS,
DurationToSecondsFunction.INSTANCE);
}

// ssl
SslConfig ssl = runtimeConfig.ssl;
SslConfig ssl = runtimeConfig.ssl();
if (ssl != null) {
setProperty(ssl.protocol, streamsProperties, SslConfigs.SSL_PROTOCOL_CONFIG);
setProperty(ssl.provider, streamsProperties, SslConfigs.SSL_PROVIDER_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,52 +6,52 @@
import java.util.Optional;
import java.util.stream.Collectors;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;

@ConfigRoot(name = "kafka-streams", phase = ConfigPhase.RUN_TIME)
public class KafkaStreamsRuntimeConfig {
@ConfigMapping(prefix = "quarkus.kafka-streams")
@ConfigRoot(phase = ConfigPhase.RUN_TIME)
public interface KafkaStreamsRuntimeConfig {

/**
* Default Kafka bootstrap server.
*/
public static final String DEFAULT_KAFKA_BROKER = "localhost:9092";
String DEFAULT_KAFKA_BROKER = "localhost:9092";

/**
* A unique identifier for this Kafka Streams application.
* If not set, defaults to quarkus.application.name.
*/
@ConfigItem(defaultValue = "${quarkus.application.name}")
public String applicationId;
@WithDefault("${quarkus.application.name}")
String applicationId();

/**
* A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s).
* If not set, fallback to {@code kafka.bootstrap.servers}, and if not set either use {@code localhost:9092}.
*/
@ConfigItem(defaultValue = DEFAULT_KAFKA_BROKER)
public List<InetSocketAddress> bootstrapServers;
@WithDefault(DEFAULT_KAFKA_BROKER)
List<InetSocketAddress> bootstrapServers();

/**
* A unique identifier of this application instance, typically in the form host:port.
*/
@ConfigItem
public Optional<String> applicationServer;
Optional<String> applicationServer();

/**
* A comma-separated list of topic names.
* The pipeline will only be started once all these topics are present in the Kafka cluster
* and {@code ignore.topics} is set to false.
*/
@ConfigItem
public Optional<List<String>> topics;
Optional<List<String>> topics();

/**
* Timeout to wait for topic names to be returned from admin client.
* If set to 0 (or negative), {@code topics} check is ignored.
*/
@ConfigItem(defaultValue = "10S")
public Duration topicsTimeout;
@WithDefault("10S")
Duration topicsTimeout();

/**
* The schema registry key.
Expand All @@ -62,48 +62,33 @@ public class KafkaStreamsRuntimeConfig {
* For Apicurio Registry, use {@code apicurio.registry.url}.
* For Confluent schema registry, use {@code schema.registry.url}.
*/
@ConfigItem(defaultValue = "schema.registry.url")
public String schemaRegistryKey;
@WithDefault("schema.registry.url")
String schemaRegistryKey();

/**
* The schema registry URL.
*/
@ConfigItem
public Optional<String> schemaRegistryUrl;
Optional<String> schemaRegistryUrl();

/**
* The security protocol to use
* See https://docs.confluent.io/current/streams/developer-guide/security.html#security-example
*/
@ConfigItem(name = "security.protocol")
public Optional<String> securityProtocol;
@WithDefault("security.protocol")
Optional<String> securityProtocol();

/**
* The SASL JAAS config.
*/
public SaslConfig sasl;
SaslConfig sasl();

/**
* Kafka SSL config
*/
public SslConfig ssl;
SslConfig ssl();

@Override
public String toString() {
return "KafkaStreamsRuntimeConfig{" +
"applicationId='" + applicationId + '\'' +
", bootstrapServers=" + bootstrapServers +
", applicationServer=" + applicationServer +
", topics=" + topics +
", schemaRegistryKey='" + schemaRegistryKey + '\'' +
", schemaRegistryUrl=" + schemaRegistryUrl +
", sasl=" + sasl +
", ssl=" + ssl +
'}';
}

public List<String> getTrimmedTopics() {
return topics.orElseThrow(() -> new IllegalArgumentException("Missing list of topics"))
default List<String> getTrimmedTopics() {
return topics().orElseThrow(() -> new IllegalArgumentException("Missing list of topics"))
.stream().map(String::trim).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,94 +4,79 @@
import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;

@ConfigGroup
public class SaslConfig {
public interface SaslConfig {

/**
* SASL mechanism used for client connections
*/
@ConfigItem
public Optional<String> mechanism;
Optional<String> mechanism();

/**
* JAAS login context parameters for SASL connections in the format used by JAAS configuration files
*/
@ConfigItem
public Optional<String> jaasConfig;
Optional<String> jaasConfig();

/**
* The fully qualified name of a SASL client callback handler class
*/
@ConfigItem
public Optional<String> clientCallbackHandlerClass;
Optional<String> clientCallbackHandlerClass();

/**
* The fully qualified name of a SASL login callback handler class
*/
@ConfigItem
public Optional<String> loginCallbackHandlerClass;
Optional<String> loginCallbackHandlerClass();

/**
* The fully qualified name of a class that implements the Login interface
*/
@ConfigItem
public Optional<String> loginClass;
Optional<String> loginClass();

/**
* The Kerberos principal name that Kafka runs as
*/
@ConfigItem
public Optional<String> kerberosServiceName;
Optional<String> kerberosServiceName();

/**
* Kerberos kinit command path
*/
@ConfigItem
public Optional<String> kerberosKinitCmd;
Optional<String> kerberosKinitCmd();

/**
* Login thread will sleep until the specified window factor of time from last refresh
*/
@ConfigItem
public Optional<Double> kerberosTicketRenewWindowFactor;
Optional<Double> kerberosTicketRenewWindowFactor();

/**
* Percentage of random jitter added to the renewal time
*/
@ConfigItem
public Optional<Double> kerberosTicketRenewJitter;
Optional<Double> kerberosTicketRenewJitter();

/**
* Percentage of random jitter added to the renewal time
*/
@ConfigItem
public Optional<Long> kerberosMinTimeBeforeRelogin;
Optional<Long> kerberosMinTimeBeforeRelogin();

/**
* Login refresh thread will sleep until the specified window factor relative to the
* credential's lifetime has been reached-
*/
@ConfigItem
public Optional<Double> loginRefreshWindowFactor;
Optional<Double> loginRefreshWindowFactor();

/**
* The maximum amount of random jitter relative to the credential's lifetime
*/
@ConfigItem
public Optional<Double> loginRefreshWindowJitter;
Optional<Double> loginRefreshWindowJitter();

/**
* The desired minimum duration for the login refresh thread to wait before refreshing a credential
*/
@ConfigItem
public Optional<Duration> loginRefreshMinPeriod;
Optional<Duration> loginRefreshMinPeriod();

/**
* The amount of buffer duration before credential expiration to maintain when refreshing a credential
*/
@ConfigItem
public Optional<Duration> loginRefreshBuffer;
Optional<Duration> loginRefreshBuffer();

}
Loading

0 comments on commit c85dffd

Please sign in to comment.