diff --git a/extensions/kafka-streams/deployment/pom.xml b/extensions/kafka-streams/deployment/pom.xml index 392701b2a2a759..d391443cf2f516 100644 --- a/extensions/kafka-streams/deployment/pom.xml +++ b/extensions/kafka-streams/deployment/pom.xml @@ -66,9 +66,6 @@ ${project.version} - - -AlegacyConfigRoot=true - diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsBuildTimeConfig.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsBuildTimeConfig.java index ef449bef60cfd9..a1e931cc1c95f9 100644 --- a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsBuildTimeConfig.java +++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsBuildTimeConfig.java @@ -1,15 +1,19 @@ package io.quarkus.kafka.streams.deployment; -import io.quarkus.runtime.annotations.ConfigItem; import io.quarkus.runtime.annotations.ConfigPhase; import io.quarkus.runtime.annotations.ConfigRoot; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import io.smallrye.config.WithName; -@ConfigRoot(name = "kafka-streams", phase = ConfigPhase.BUILD_TIME) -public class KafkaStreamsBuildTimeConfig { +@ConfigMapping(prefix = "quarkus.kafka-streams") +@ConfigRoot(phase = ConfigPhase.BUILD_TIME) +public interface KafkaStreamsBuildTimeConfig { /** * Whether a health check is published in case the smallrye-health extension is present (defaults to true). */ - @ConfigItem(name = "health.enabled", defaultValue = "true") - public boolean healthEnabled; + @WithName("health.enabled") + @WithDefault("true") + boolean healthEnabled(); } diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java index 71ff4ebc25cf91..10f6024a18a2e2 100644 --- a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java +++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java @@ -213,11 +213,11 @@ void addHealthChecks(KafkaStreamsBuildTimeConfig buildTimeConfig, BuildProducer< healthChecks.produce( new HealthBuildItem( "io.quarkus.kafka.streams.runtime.health.KafkaStreamsTopicsHealthCheck", - buildTimeConfig.healthEnabled)); + buildTimeConfig.healthEnabled())); healthChecks.produce( new HealthBuildItem( "io.quarkus.kafka.streams.runtime.health.KafkaStreamsStateHealthCheck", - buildTimeConfig.healthEnabled)); + buildTimeConfig.healthEnabled())); } @BuildStep diff --git a/extensions/kafka-streams/runtime/pom.xml b/extensions/kafka-streams/runtime/pom.xml index e89e81725e8f4c..f3c807cec5ee4a 100644 --- a/extensions/kafka-streams/runtime/pom.xml +++ b/extensions/kafka-streams/runtime/pom.xml @@ -76,9 +76,6 @@ ${project.version} - - -AlegacyConfigRoot=true - diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java index 078fa15e338f7a..e5b86101b7b0c2 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java @@ -93,7 +93,7 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream Properties buildTimeProperties = kafkaStreamsSupport.getProperties(); - String bootstrapServersConfig = asString(runtimeConfig.bootstrapServers); + String bootstrapServersConfig = asString(runtimeConfig.bootstrapServers()); if (DEFAULT_KAFKA_BROKER.equalsIgnoreCase(bootstrapServersConfig)) { // Try to see if kafka.bootstrap.servers is set, if so, use that value, if not, keep localhost:9092 bootstrapServersConfig = ConfigProvider.getConfig().getOptionalValue("kafka.bootstrap.servers", String.class) @@ -109,7 +109,7 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream this.executorService = executorService; - this.topicsTimeout = runtimeConfig.topicsTimeout; + this.topicsTimeout = runtimeConfig.topicsTimeout(); this.trimmedTopics = isTopicsCheckEnabled() ? runtimeConfig.getTrimmedTopics() : Collections.emptyList(); this.streamsConfig = new StreamsConfig(kafkaStreamsProperties); this.kafkaStreams = initializeKafkaStreams(streamsConfig, topology.get(), @@ -217,51 +217,52 @@ private static Properties getStreamsProperties(Properties properties, // add runtime options streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); - streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, runtimeConfig.applicationId); + streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, runtimeConfig.applicationId()); // app id - if (runtimeConfig.applicationServer.isPresent()) { - streamsProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, runtimeConfig.applicationServer.get()); + if (runtimeConfig.applicationServer().isPresent()) { + streamsProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, runtimeConfig.applicationServer().get()); } // schema registry - if (runtimeConfig.schemaRegistryUrl.isPresent()) { - streamsProperties.put(runtimeConfig.schemaRegistryKey, runtimeConfig.schemaRegistryUrl.get()); + if (runtimeConfig.schemaRegistryUrl().isPresent()) { + streamsProperties.put(runtimeConfig.schemaRegistryKey(), runtimeConfig.schemaRegistryUrl().get()); } // set the security protocol (in case we are doing PLAIN_TEXT) - setProperty(runtimeConfig.securityProtocol, streamsProperties, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); + setProperty(runtimeConfig.securityProtocol(), streamsProperties, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); // sasl - SaslConfig sc = runtimeConfig.sasl; + SaslConfig sc = runtimeConfig.sasl(); if (sc != null) { - setProperty(sc.mechanism, streamsProperties, SaslConfigs.SASL_MECHANISM); + setProperty(sc.mechanism(), streamsProperties, SaslConfigs.SASL_MECHANISM); - setProperty(sc.jaasConfig, streamsProperties, SaslConfigs.SASL_JAAS_CONFIG); + setProperty(sc.jaasConfig(), streamsProperties, SaslConfigs.SASL_JAAS_CONFIG); - setProperty(sc.clientCallbackHandlerClass, streamsProperties, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS); + setProperty(sc.clientCallbackHandlerClass(), streamsProperties, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS); - setProperty(sc.loginCallbackHandlerClass, streamsProperties, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS); - setProperty(sc.loginClass, streamsProperties, SaslConfigs.SASL_LOGIN_CLASS); + setProperty(sc.loginCallbackHandlerClass(), streamsProperties, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS); + setProperty(sc.loginClass(), streamsProperties, SaslConfigs.SASL_LOGIN_CLASS); - setProperty(sc.kerberosServiceName, streamsProperties, SaslConfigs.SASL_KERBEROS_SERVICE_NAME); - setProperty(sc.kerberosKinitCmd, streamsProperties, SaslConfigs.SASL_KERBEROS_KINIT_CMD); - setProperty(sc.kerberosTicketRenewWindowFactor, streamsProperties, + setProperty(sc.kerberosServiceName(), streamsProperties, SaslConfigs.SASL_KERBEROS_SERVICE_NAME); + setProperty(sc.kerberosKinitCmd(), streamsProperties, SaslConfigs.SASL_KERBEROS_KINIT_CMD); + setProperty(sc.kerberosTicketRenewWindowFactor(), streamsProperties, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR); - setProperty(sc.kerberosTicketRenewJitter, streamsProperties, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER); - setProperty(sc.kerberosMinTimeBeforeRelogin, streamsProperties, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN); + setProperty(sc.kerberosTicketRenewJitter(), streamsProperties, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER); + setProperty(sc.kerberosMinTimeBeforeRelogin(), streamsProperties, + SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN); - setProperty(sc.loginRefreshWindowFactor, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR); - setProperty(sc.loginRefreshWindowJitter, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER); + setProperty(sc.loginRefreshWindowFactor(), streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR); + setProperty(sc.loginRefreshWindowJitter(), streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER); - setProperty(sc.loginRefreshMinPeriod, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, + setProperty(sc.loginRefreshMinPeriod(), streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, DurationToSecondsFunction.INSTANCE); - setProperty(sc.loginRefreshBuffer, streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, + setProperty(sc.loginRefreshBuffer(), streamsProperties, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, DurationToSecondsFunction.INSTANCE); } // ssl - SslConfig ssl = runtimeConfig.ssl; + SslConfig ssl = runtimeConfig.ssl(); if (ssl != null) { setProperty(ssl.protocol, streamsProperties, SslConfigs.SSL_PROTOCOL_CONFIG); setProperty(ssl.provider, streamsProperties, SslConfigs.SSL_PROVIDER_CONFIG); diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java index 7a5b58d54a5802..0a44480ccf6104 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java @@ -6,52 +6,52 @@ import java.util.Optional; import java.util.stream.Collectors; -import io.quarkus.runtime.annotations.ConfigItem; import io.quarkus.runtime.annotations.ConfigPhase; import io.quarkus.runtime.annotations.ConfigRoot; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; -@ConfigRoot(name = "kafka-streams", phase = ConfigPhase.RUN_TIME) -public class KafkaStreamsRuntimeConfig { +@ConfigMapping(prefix = "quarkus.kafka-streams") +@ConfigRoot(phase = ConfigPhase.RUN_TIME) +public interface KafkaStreamsRuntimeConfig { /** * Default Kafka bootstrap server. */ - public static final String DEFAULT_KAFKA_BROKER = "localhost:9092"; + String DEFAULT_KAFKA_BROKER = "localhost:9092"; /** * A unique identifier for this Kafka Streams application. * If not set, defaults to quarkus.application.name. */ - @ConfigItem(defaultValue = "${quarkus.application.name}") - public String applicationId; + @WithDefault("${quarkus.application.name}") + String applicationId(); /** * A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s). * If not set, fallback to {@code kafka.bootstrap.servers}, and if not set either use {@code localhost:9092}. */ - @ConfigItem(defaultValue = DEFAULT_KAFKA_BROKER) - public List bootstrapServers; + @WithDefault(DEFAULT_KAFKA_BROKER) + List bootstrapServers(); /** * A unique identifier of this application instance, typically in the form host:port. */ - @ConfigItem - public Optional applicationServer; + Optional applicationServer(); /** * A comma-separated list of topic names. * The pipeline will only be started once all these topics are present in the Kafka cluster * and {@code ignore.topics} is set to false. */ - @ConfigItem - public Optional> topics; + Optional> topics(); /** * Timeout to wait for topic names to be returned from admin client. * If set to 0 (or negative), {@code topics} check is ignored. */ - @ConfigItem(defaultValue = "10S") - public Duration topicsTimeout; + @WithDefault("10S") + Duration topicsTimeout(); /** * The schema registry key. @@ -62,48 +62,33 @@ public class KafkaStreamsRuntimeConfig { * For Apicurio Registry, use {@code apicurio.registry.url}. * For Confluent schema registry, use {@code schema.registry.url}. */ - @ConfigItem(defaultValue = "schema.registry.url") - public String schemaRegistryKey; + @WithDefault("schema.registry.url") + String schemaRegistryKey(); /** * The schema registry URL. */ - @ConfigItem - public Optional schemaRegistryUrl; + Optional schemaRegistryUrl(); /** * The security protocol to use * See https://docs.confluent.io/current/streams/developer-guide/security.html#security-example */ - @ConfigItem(name = "security.protocol") - public Optional securityProtocol; + @WithDefault("security.protocol") + Optional securityProtocol(); /** * The SASL JAAS config. */ - public SaslConfig sasl; + SaslConfig sasl(); /** * Kafka SSL config */ - public SslConfig ssl; + SslConfig ssl(); - @Override - public String toString() { - return "KafkaStreamsRuntimeConfig{" + - "applicationId='" + applicationId + '\'' + - ", bootstrapServers=" + bootstrapServers + - ", applicationServer=" + applicationServer + - ", topics=" + topics + - ", schemaRegistryKey='" + schemaRegistryKey + '\'' + - ", schemaRegistryUrl=" + schemaRegistryUrl + - ", sasl=" + sasl + - ", ssl=" + ssl + - '}'; - } - - public List getTrimmedTopics() { - return topics.orElseThrow(() -> new IllegalArgumentException("Missing list of topics")) + default List getTrimmedTopics() { + return topics().orElseThrow(() -> new IllegalArgumentException("Missing list of topics")) .stream().map(String::trim).collect(Collectors.toList()); } } diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SaslConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SaslConfig.java index 79dd6ac72efe59..b1b6f2e415718e 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SaslConfig.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/SaslConfig.java @@ -4,94 +4,79 @@ import java.util.Optional; import io.quarkus.runtime.annotations.ConfigGroup; -import io.quarkus.runtime.annotations.ConfigItem; @ConfigGroup -public class SaslConfig { +public interface SaslConfig { /** * SASL mechanism used for client connections */ - @ConfigItem - public Optional mechanism; + Optional mechanism(); /** * JAAS login context parameters for SASL connections in the format used by JAAS configuration files */ - @ConfigItem - public Optional jaasConfig; + Optional jaasConfig(); /** * The fully qualified name of a SASL client callback handler class */ - @ConfigItem - public Optional clientCallbackHandlerClass; + Optional clientCallbackHandlerClass(); /** * The fully qualified name of a SASL login callback handler class */ - @ConfigItem - public Optional loginCallbackHandlerClass; + Optional loginCallbackHandlerClass(); /** * The fully qualified name of a class that implements the Login interface */ - @ConfigItem - public Optional loginClass; + Optional loginClass(); /** * The Kerberos principal name that Kafka runs as */ - @ConfigItem - public Optional kerberosServiceName; + Optional kerberosServiceName(); /** * Kerberos kinit command path */ - @ConfigItem - public Optional kerberosKinitCmd; + Optional kerberosKinitCmd(); /** * Login thread will sleep until the specified window factor of time from last refresh */ - @ConfigItem - public Optional kerberosTicketRenewWindowFactor; + Optional kerberosTicketRenewWindowFactor(); /** * Percentage of random jitter added to the renewal time */ - @ConfigItem - public Optional kerberosTicketRenewJitter; + Optional kerberosTicketRenewJitter(); /** * Percentage of random jitter added to the renewal time */ - @ConfigItem - public Optional kerberosMinTimeBeforeRelogin; + Optional kerberosMinTimeBeforeRelogin(); /** * Login refresh thread will sleep until the specified window factor relative to the * credential's lifetime has been reached- */ - @ConfigItem - public Optional loginRefreshWindowFactor; + Optional loginRefreshWindowFactor(); /** * The maximum amount of random jitter relative to the credential's lifetime */ - @ConfigItem - public Optional loginRefreshWindowJitter; + Optional loginRefreshWindowJitter(); /** * The desired minimum duration for the login refresh thread to wait before refreshing a credential */ - @ConfigItem - public Optional loginRefreshMinPeriod; + Optional loginRefreshMinPeriod(); /** * The amount of buffer duration before credential expiration to maintain when refreshing a credential */ - @ConfigItem - public Optional loginRefreshBuffer; + Optional loginRefreshBuffer(); } diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsTopicsHealthCheck.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsTopicsHealthCheck.java index 84c23f440df87b..032f048c439fc6 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsTopicsHealthCheck.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsTopicsHealthCheck.java @@ -35,8 +35,8 @@ public class KafkaStreamsTopicsHealthCheck implements HealthCheck { @PostConstruct public void init() { - if (kafkaStreamsRuntimeConfig.topicsTimeout.compareTo(Duration.ZERO) > 0) { - trimmedTopics = kafkaStreamsRuntimeConfig.topics + if (kafkaStreamsRuntimeConfig.topicsTimeout().compareTo(Duration.ZERO) > 0) { + trimmedTopics = kafkaStreamsRuntimeConfig.topics() .orElseThrow(() -> new IllegalArgumentException("Missing list of topics")) .stream() .map(String::trim) @@ -49,7 +49,7 @@ public HealthCheckResponse call() { HealthCheckResponseBuilder builder = HealthCheckResponse.named("Kafka Streams topics health check").up(); if (trimmedTopics != null) { try { - Set missingTopics = manager.getMissingTopics(trimmedTopics, kafkaStreamsRuntimeConfig.topicsTimeout); + Set missingTopics = manager.getMissingTopics(trimmedTopics, kafkaStreamsRuntimeConfig.topicsTimeout()); List availableTopics = new ArrayList<>(trimmedTopics); availableTopics.removeAll(missingTopics); diff --git a/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsTopicsHealthCheckTest.java b/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsTopicsHealthCheckTest.java index 9fa11bb2657022..0820b29e8334c1 100644 --- a/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsTopicsHealthCheckTest.java +++ b/extensions/kafka-streams/runtime/src/test/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsTopicsHealthCheckTest.java @@ -31,10 +31,13 @@ public class KafkaStreamsTopicsHealthCheckTest { public void setUp() { MockitoAnnotations.initMocks(this); - KafkaStreamsRuntimeConfig kafkaStreamsRuntimeConfig = new KafkaStreamsRuntimeConfig(); - kafkaStreamsRuntimeConfig.topics = Optional.of(Collections.singletonList("topic")); - kafkaStreamsRuntimeConfig.topicsTimeout = Duration.ofSeconds(10); - healthCheck.kafkaStreamsRuntimeConfig = kafkaStreamsRuntimeConfig; + KafkaStreamsRuntimeConfig configMock = Mockito.mock(KafkaStreamsRuntimeConfig.class); + Mockito.doReturn(Optional.of(Collections.singletonList("topic"))) + .when(configMock).topics(); + + Mockito.doReturn(Duration.ofSeconds(10)) + .when(configMock).topicsTimeout(); + healthCheck.kafkaStreamsRuntimeConfig = configMock; healthCheck.init(); }