Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple outbound topics for publishing blockchain events #152

Merged
merged 3 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions scripts/run-integration-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ cd "$(dirname "$0")"
source pull-fabric-images.sh

pushd ../src/test/java/fabricSetup/ >/dev/null
docker-compose up --force-recreate -d
docker compose up --force-recreate -d
popd >/dev/null && cd ..

docker ps -a
Expand All @@ -22,4 +22,4 @@ export MAVEN_OPTS="-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Sl
mvn clean test-compile failsafe:integration-test -Dmaven.test.failure.ignore=false

pushd src/test/java/fabricSetup/ >/dev/null
docker-compose down --volumes
docker compose down --volumes
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,21 @@ public static class Events {
// preferred for providing Chaincode details for Event subscription
private List<String> chaincode;
private boolean standardCCEventEnabled;
private List<String> block;
private List<BlockDetails> blockDetails;
private List<ChaincodeDetails> chaincodeDetails;
}

@Data
public static class BlockDetails {
private String channelName;
private List<String> listenerTopics;
}

@Data
public static class ChaincodeDetails {
private String channelName;
private String chaincodeId;
private List<String> listenerTopics;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -22,6 +23,7 @@
import org.springframework.kafka.listener.ConsumerAwareRecordRecoverer;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.CollectionUtils;
import org.springframework.util.backoff.FixedBackOff;

@Configuration
Expand Down Expand Up @@ -59,11 +61,16 @@ public CommonErrorHandler topicTransactionErrorHandler() {

deadLetterPublishingRecoverer =
generateRecordRecovererWithPublisher(kafkaProperties.getFailedMessageListener());
} else if (Objects.nonNull(kafkaProperties.getEventListener())
&& kafkaProperties.getEventListener().isListenToFailedMessages()) {

deadLetterPublishingRecoverer =
generateRecordRecovererWithPublisher(kafkaProperties.getEventListener());
} else if (!CollectionUtils.isEmpty(kafkaProperties.getEventListeners())) {

Optional<KafkaProperties.EventProducer> eventProducerOptional =
kafkaProperties.getEventListeners().stream()
.filter(KafkaProperties.EventProducer::isListenToFailedMessages)
.findAny();
if (eventProducerOptional.isPresent()) {
deadLetterPublishingRecoverer =
generateRecordRecovererWithPublisher(eventProducerOptional.get());
}
}

/*
Expand Down Expand Up @@ -103,7 +110,7 @@ public void accept(
private DeadLetterPublishingRecoverer generateRecordRecovererWithPublisher(
KafkaProperties.Producer destination) {

KafkaTemplate<String, String> deadLetterPublisherTemplate =
KafkaTemplate<Object, Object> deadLetterPublisherTemplate =
new KafkaTemplate<>(kafkaProducerConfig.eventProducerFactory(destination));
deadLetterPublisherTemplate.setDefaultTopic(destination.getTopic());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
package hlf.java.rest.client.config;

import hlf.java.rest.client.exception.ErrorCode;
import hlf.java.rest.client.exception.ServiceException;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.MicrometerProducerListener;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.RoutingKafkaTemplate;

/** This class is the configuration class for sending to Chaincode event to eventHub/Kafka Topic. */
@Slf4j
@Configuration
@ConditionalOnProperty("kafka.event-listener.brokerHost")
@ConditionalOnProperty("kafka.event-listeners[0].brokerHost")
@RefreshScope
public class KafkaProducerConfig extends BaseKafkaConfig {

private static final String PRODUCER_ALL_ACKS = "all";
private static final int RETRIES_CONFIG_FOR_AT_MOST_ONCE = 0;

@Autowired private KafkaProperties kafkaProperties;

@Autowired private MeterRegistry meterRegistry;

public ProducerFactory<String, String> eventProducerFactory(
public ProducerFactory<Object, Object> eventProducerFactory(
KafkaProperties.Producer kafkaProducerProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerProperties.getBrokerHost());
Expand Down Expand Up @@ -60,7 +66,7 @@ public ProducerFactory<String, String> eventProducerFactory(

log.info("Generating Kafka producer factory..");

DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory =
DefaultKafkaProducerFactory<Object, Object> defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory<>(props);
defaultKafkaProducerFactory.addListener(new MicrometerProducerListener<>(meterRegistry));

Expand All @@ -69,8 +75,24 @@ public ProducerFactory<String, String> eventProducerFactory(

@Bean
@RefreshScope
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(eventProducerFactory(kafkaProperties.getEventListener()));
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {

Set<String> topicSet = new HashSet<>();
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
for (KafkaProperties.EventProducer eventProducer : kafkaProperties.getEventListeners()) {
if (topicSet.contains(eventProducer.getTopic())) {
throw new ServiceException(ErrorCode.NOT_SUPPORTED, "Topic name should be unique");
}
topicSet.add(eventProducer.getTopic());
ProducerFactory<Object, Object> defaultKafkaProducerFactory =
eventProducerFactory(eventProducer);
context.registerBean(
eventProducer.getTopic() + "PF",
ProducerFactory.class,
() -> defaultKafkaProducerFactory);
map.put(Pattern.compile(eventProducer.getTopic()), defaultKafkaProducerFactory);
}
return new RoutingKafkaTemplate(map);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public class KafkaProperties {

private List<Consumer> integrationPoints;
private EventProducer eventListener;
private List<EventProducer> eventListeners;
private Producer failedMessageListener;

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ public void onRefresh(RefreshScopeRefreshedEvent event) {
startEventListener();
}

public void startEventListener() {
private void startEventListener() {

try {
List<String> blockChannelNames = fabricProperties.getEvents().getBlock();
if (!CollectionUtils.isEmpty(blockChannelNames)) {
List<FabricProperties.BlockDetails> blockDetailsList =
fabricProperties.getEvents().getBlockDetails();
if (!CollectionUtils.isEmpty(blockDetailsList)) {

for (String channelName : blockChannelNames) {
for (FabricProperties.BlockDetails blockDetails : blockDetailsList) {
String channelName = blockDetails.getChannelName();
log.info("channel names {}", channelName);
Network network = gateway.getNetwork(channelName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,17 @@
* The EventPublishService is a service class, which include the kafka template. It sends the
* Message to the Event Kafka message topic
*/
@ConditionalOnProperty("kafka.event-listener.brokerHost")
@ConditionalOnProperty("kafka.event-listeners[0].brokerHost")
public interface EventPublishService {

/**
* @param payload String message payload
* @param fabricTxId String Fabric transaction ID
* @param eventName String chaincode event-name
* @param channelName String Name of the channel where the event was generated.
* @return status boolean status of msg sent
*/
boolean sendMessage(
final String payload, String fabricTxId, String eventName, String channelName);

/**
* @param payload String message payload
* @param fabricTxId String Fabric transaction ID
* @param eventName String chaincode event-name
* @param channelName String Name of the channel where the event was generated.
* @param messageKey associated key for the payload.
* @return status boolean status of msg sent
*/
boolean publishChaincodeEvents(
void publishChaincodeEvents(
final String payload,
String chaincodeName,
String fabricTxId,
Expand All @@ -42,9 +31,8 @@ boolean publishChaincodeEvents(
* @param channelName String Name of the channel where the event was generated.
* @param functionName String Name of the function name.
* @param isPrivateDataPresent boolean flag to check if privateData present in payload
* @return status boolean status of msg sent
*/
boolean publishBlockEvents(
void publishBlockEvents(
final String payload,
String fabricTxId,
String channelName,
Expand Down
Loading
Loading