From fa54d6c9009def239646935c4f32caac55441c38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CNithin?= Date: Sun, 7 Apr 2024 22:00:44 +0530 Subject: [PATCH 1/3] Change default partition strategy from Range based to Round-Robin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Nithin --- .../hlf/java/rest/client/config/KafkaConsumerConfig.java | 5 +++++ .../hlf/java/rest/client/util/FabricClientConstants.java | 2 ++ 2 files changed, 7 insertions(+) diff --git a/src/main/java/hlf/java/rest/client/config/KafkaConsumerConfig.java b/src/main/java/hlf/java/rest/client/config/KafkaConsumerConfig.java index 6918fd4..d3a626d 100644 --- a/src/main/java/hlf/java/rest/client/config/KafkaConsumerConfig.java +++ b/src/main/java/hlf/java/rest/client/config/KafkaConsumerConfig.java @@ -43,6 +43,11 @@ public DefaultKafkaConsumerFactory consumerFactory( props.put( ConsumerConfig.MAX_POLL_RECORDS_CONFIG, FabricClientConstants.KAFKA_INTG_MAX_POLL_RECORDS); + // Distribute available partitions evenly across all consumers (or consumer threads) + props.put( + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, + FabricClientConstants.ROUND_ROBIN_CONSUMER_PARTITION_ASSIGNEMENT_STRATEGY); + // Azure event-hub config configureSaslProperties(props, kafkaConsumerProperties.getSaslJaasConfig()); diff --git a/src/main/java/hlf/java/rest/client/util/FabricClientConstants.java b/src/main/java/hlf/java/rest/client/util/FabricClientConstants.java index c0371f7..6a307c5 100644 --- a/src/main/java/hlf/java/rest/client/util/FabricClientConstants.java +++ b/src/main/java/hlf/java/rest/client/util/FabricClientConstants.java @@ -80,4 +80,6 @@ public class FabricClientConstants { public static final String JSON_PATH_CERTIFICATE = "$..certificate"; public static final String VALUE_TAG_CAPABILITIES = "Capabilities"; + public static final String ROUND_ROBIN_CONSUMER_PARTITION_ASSIGNEMENT_STRATEGY = + "org.apache.kafka.clients.consumer.RoundRobinAssignor"; } From 3d09021050c86f1ef4fb0f4d57f5b7481bd90cac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CNithin?= Date: Sun, 7 Apr 2024 22:01:57 +0530 Subject: [PATCH 2/3] Improved metric details for SSL certs expiry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Nithin --- .../rest/client/config/BaseKafkaConfig.java | 11 +++++++++++ .../rest/client/config/SSLAuthFilesHelper.java | 17 ++++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/main/java/hlf/java/rest/client/config/BaseKafkaConfig.java b/src/main/java/hlf/java/rest/client/config/BaseKafkaConfig.java index 38ab13b..0e0cf54 100644 --- a/src/main/java/hlf/java/rest/client/config/BaseKafkaConfig.java +++ b/src/main/java/hlf/java/rest/client/config/BaseKafkaConfig.java @@ -4,7 +4,10 @@ import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.time.Instant; +import java.util.Date; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.BooleanUtils; @@ -57,18 +60,24 @@ protected void configureSSLProperties( SSLAuthFilesHelper.getExpiryTimestampForKeyStore( sslProperties.getSslTruststoreLocation(), sslProperties.getSslTruststorePassword()); + DateFormat formatter = new SimpleDateFormat("dd/MM/yyyy"); + String guagePrefix = getConfigType().equals(ConfigType.CONSUMER) ? "consumer." : "producer."; Gauge.builder( guagePrefix + topicName + ".keystore.expiryTs", keyStoreCertExpiryTimestamp::getTime) + .tag("topic-name", topicName) + .tag("expiry-date", formatter.format(new Date(keyStoreCertExpiryTimestamp.getTime()))) .strongReference(true) .register(sslMetricsRegistry); Gauge.builder( guagePrefix + topicName + ".truststore.expiryTs", trustStoreCertExpiryTimestamp::getTime) + .tag("topic-name", topicName) + .tag("expiry-date", formatter.format(new Date(trustStoreCertExpiryTimestamp.getTime()))) .strongReference(true) .register(sslMetricsRegistry); @@ -81,6 +90,7 @@ protected void configureSSLProperties( guagePrefix + topicName + ".keystore.hasExpired", hasKeyStoreCertExpired, BooleanUtils::toInteger) + .tag("topic-name", topicName) .strongReference(true) .register(sslMetricsRegistry); @@ -88,6 +98,7 @@ protected void configureSSLProperties( guagePrefix + topicName + ".truststore.hasExpired", hasTrustStoreCertExpired, BooleanUtils::toInteger) + .tag("topic-name", topicName) .strongReference(true) .register(sslMetricsRegistry); diff --git a/src/main/java/hlf/java/rest/client/config/SSLAuthFilesHelper.java b/src/main/java/hlf/java/rest/client/config/SSLAuthFilesHelper.java index 3c2c4b1..db676ec 100644 --- a/src/main/java/hlf/java/rest/client/config/SSLAuthFilesHelper.java +++ b/src/main/java/hlf/java/rest/client/config/SSLAuthFilesHelper.java @@ -12,10 +12,14 @@ import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.Enumeration; +import java.util.List; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; @Slf4j @UtilityClass @@ -57,18 +61,25 @@ Timestamp getExpiryTimestampForKeyStore(String keyStorePath, String keyStorePass KeyStore keyStore = loadKeyStore(keyStorePath, keyStorePassword); + List certExpiryTimestamps = new ArrayList<>(); + Enumeration aliases = keyStore.aliases(); while (aliases.hasMoreElements()) { String alias = aliases.nextElement(); Certificate cert = keyStore.getCertificate(alias); if (cert instanceof X509Certificate) { X509Certificate x509Cert = (X509Certificate) cert; - return new Timestamp(x509Cert.getNotAfter().getTime()); + certExpiryTimestamps.add(new Timestamp(x509Cert.getNotAfter().getTime())); } } - throw new CertificateException( - "Couldn't extract an instance of X509Certificate for fetching expiry details"); + if (CollectionUtils.isEmpty(certExpiryTimestamps)) { + throw new CertificateException( + "Couldn't extract an instance of X509Certificate for fetching expiry details"); + } + + // Return the earliest (minimum) timestamp from the list + return Collections.min(certExpiryTimestamps); } private static KeyStore loadKeyStore(String path, String password) From 9600b0ce474bb107dfc4b1a66eb24839a311a2cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CNithin?= Date: Mon, 8 Apr 2024 08:47:52 +0530 Subject: [PATCH 3/3] Improvement in Listerner error metrics Aspect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: “Nithin --- .../metrics/EmitCustomTransactionListenerMetricsAspect.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/hlf/java/rest/client/metrics/EmitCustomTransactionListenerMetricsAspect.java b/src/main/java/hlf/java/rest/client/metrics/EmitCustomTransactionListenerMetricsAspect.java index e3898e9..ecf3dcd 100644 --- a/src/main/java/hlf/java/rest/client/metrics/EmitCustomTransactionListenerMetricsAspect.java +++ b/src/main/java/hlf/java/rest/client/metrics/EmitCustomTransactionListenerMetricsAspect.java @@ -38,10 +38,12 @@ public Object interceptedKafkaMetricsEmissionAdvice(ProceedingJoinPoint proceedi if (e instanceof UnrecognizedTransactionPayloadException) { invalidInboundTransactionMessageCounter.increment(); + throw e; } if (e instanceof ContractException) { inboundTxnContractExceptionCounter.increment(); + throw e; } inboundTxnProcessingFailureCounter.increment();